1mod checker;
2mod context;
3mod error;
4mod event;
5mod graph;
6mod id;
7mod language;
8mod memory;
9mod registry;
10mod repository;
11
12pub use context::{SqlLogEntry, SqlLogOperation, SqlLogOptions, UserContext};
13pub use error::{ContextError, RepositoryError, RuntimeError};
14pub use event::{EntityEvent, EntityEventKind, EntityEventSink, InMemoryEntityEventSink};
15pub use graph::{
16 GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem, GraphNode,
17 GraphOperation, sorted_update_fields,
18};
19pub(crate) use id::local_id_generator;
20pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
21pub use language::{
22 BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
23};
24pub use memory::{MemoryRepository, MemoryRepositoryError};
25pub use registry::{
26 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
27 MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
28 RuntimeModule,
29};
30pub use repository::{
31 AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
32 QueryExecutor, RelationLoadPlan, Repository, ResolvedRepository,
33};
34
35#[cfg(feature = "sqlx")]
36pub mod sqlx_support;
37
38#[cfg(test)]
39mod tests {
40 use std::collections::{BTreeMap, VecDeque};
41 use std::sync::{Arc, Mutex};
42
43 use super::{
44 AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResults,
45 Checker, EntityEvent, EntityEventKind, EntityEventSink, GraphMutationKind, GraphNode,
46 GraphTransactionBoundary, InMemoryAggregationCache, InMemoryCheckerRegistry,
47 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
48 InternalIdGenerator, Language, MemoryRepository, MetadataStore, ObjectLocation,
49 QueryExecutor, Repository, RepositoryBehavior, RepositoryError, RuntimeError,
50 RuntimeModule, SqlLogOperation, SqlLogOptions, UserContext, translate_check_result,
51 };
52 use teaql_core::{
53 Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
54 EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
55 RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
56 };
57 use teaql_dialect_pg::PostgresDialect;
58 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
59 use teaql_sql::CompiledQuery;
60
61 const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
62
63 fn entity() -> EntityDescriptor {
64 EntityDescriptor::new("Order")
65 .table_name("orders")
66 .property(
67 PropertyDescriptor::new("id", DataType::U64)
68 .column_name("id")
69 .id()
70 .not_null(),
71 )
72 .property(
73 PropertyDescriptor::new("version", DataType::I64)
74 .column_name("version")
75 .version()
76 .not_null(),
77 )
78 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
79 .relation(
80 teaql_core::RelationDescriptor::new("lines", "OrderLine")
81 .local_key("id")
82 .foreign_key("order_id")
83 .many(),
84 )
85 }
86
87 fn line_entity() -> EntityDescriptor {
88 EntityDescriptor::new("OrderLine")
89 .table_name("orderline")
90 .property(
91 PropertyDescriptor::new("id", DataType::U64)
92 .column_name("id")
93 .id()
94 .not_null(),
95 )
96 .property(
97 PropertyDescriptor::new("version", DataType::I64)
98 .column_name("version")
99 .version(),
100 )
101 .property(
102 PropertyDescriptor::new("order_id", DataType::U64)
103 .column_name("order_id")
104 .not_null(),
105 )
106 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
107 .property(
108 PropertyDescriptor::new("product_id", DataType::U64)
109 .column_name("product_id")
110 .not_null(),
111 )
112 .relation(
113 teaql_core::RelationDescriptor::new("product", "Product")
114 .local_key("product_id")
115 .foreign_key("id"),
116 )
117 }
118
119 fn product_entity() -> EntityDescriptor {
120 EntityDescriptor::new("Product")
121 .table_name("product")
122 .property(
123 PropertyDescriptor::new("id", DataType::U64)
124 .column_name("id")
125 .id()
126 .not_null(),
127 )
128 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
129 }
130
131 #[derive(Debug, Default)]
132 struct StubExecutor {
133 affected: u64,
134 rows: Vec<Record>,
135 }
136
137 #[derive(Debug, Default)]
138 struct QueueExecutor {
139 affected: u64,
140 rows: Mutex<VecDeque<Vec<Record>>>,
141 queries: Mutex<Vec<String>>,
142 }
143
144 struct OrderBehavior;
145
146 #[allow(dead_code)]
147 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
148 #[teaql(entity = "CatalogProduct", table = "catalog_product")]
149 struct CatalogProductRow {
150 #[teaql(id)]
151 id: u64,
152 name: String,
153 }
154
155 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
156 #[teaql(entity = "OrderAggregate", table = "orders")]
157 struct OrderAggregateDynamic {
158 #[teaql(id)]
159 id: u64,
160 #[teaql(dynamic)]
161 dynamic: BTreeMap<String, Value>,
162 }
163
164 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
165 #[teaql(entity = "Product", table = "product")]
166 struct ProductEntityRow {
167 #[teaql(id)]
168 id: u64,
169 name: String,
170 }
171
172 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
173 #[teaql(entity = "OrderLine", table = "orderline")]
174 struct OrderLineEntityRow {
175 #[teaql(id)]
176 id: u64,
177 #[teaql(column = "order_id")]
178 order_id: u64,
179 name: String,
180 #[teaql(column = "product_id")]
181 product_id: u64,
182 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
183 product: Option<ProductEntityRow>,
184 }
185
186 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
187 #[teaql(entity = "Order", table = "orders")]
188 struct OrderAggregateRow {
189 #[teaql(id)]
190 id: u64,
191 #[teaql(version)]
192 version: i64,
193 name: String,
194 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
195 lines: teaql_core::SmartList<OrderLineEntityRow>,
196 }
197
198 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
199 #[teaql(entity = "Order", table = "orders")]
200 struct Order {
201 #[teaql(id)]
202 id: u64,
203 #[teaql(version)]
204 version: i64,
205 name: String,
206 }
207
208 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
209 #[teaql(entity = "Product", table = "product")]
210 struct TypedGraphProduct {
211 #[teaql(id)]
212 id: Option<u64>,
213 name: String,
214 }
215
216 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
217 #[teaql(entity = "OrderLine", table = "orderline")]
218 struct TypedGraphLine {
219 #[teaql(id)]
220 id: Option<u64>,
221 #[teaql(column = "order_id")]
222 order_id: Option<u64>,
223 name: String,
224 #[teaql(column = "product_id")]
225 product_id: Option<u64>,
226 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
227 product: Option<TypedGraphProduct>,
228 }
229
230 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
231 #[teaql(entity = "Order", table = "orders")]
232 struct TypedGraphOrder {
233 #[teaql(id)]
234 id: Option<u64>,
235 #[teaql(version)]
236 version: i64,
237 name: String,
238 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
239 lines: teaql_core::SmartList<TypedGraphLine>,
240 }
241
242 #[derive(Debug, PartialEq, Eq)]
243 struct OrderEntity {
244 id: u64,
245 version: i64,
246 name: String,
247 }
248
249 impl teaql_core::TeaqlEntity for OrderEntity {
250 fn entity_descriptor() -> EntityDescriptor {
251 entity()
252 }
253 }
254
255 impl Entity for OrderEntity {
256 fn from_record(record: Record) -> Result<Self, EntityError> {
257 let id = match record.get("id") {
258 Some(Value::U64(v)) => *v,
259 Some(Value::I64(v)) if *v >= 0 => *v as u64,
260 other => {
261 return Err(EntityError::new(
262 "Order",
263 format!("invalid id field: {other:?}"),
264 ));
265 }
266 };
267 let version = match record.get("version") {
268 Some(Value::I64(v)) => *v,
269 other => {
270 return Err(EntityError::new(
271 "Order",
272 format!("invalid version field: {other:?}"),
273 ));
274 }
275 };
276 let name = match record.get("name") {
277 Some(Value::Text(v)) => v.clone(),
278 other => {
279 return Err(EntityError::new(
280 "Order",
281 format!("invalid name field: {other:?}"),
282 ));
283 }
284 };
285 Ok(Self { id, version, name })
286 }
287
288 fn into_record(self) -> Record {
289 Record::from([
290 (String::from("id"), Value::U64(self.id)),
291 (String::from("version"), Value::I64(self.version)),
292 (String::from("name"), Value::Text(self.name)),
293 ])
294 }
295 }
296
297 #[derive(Debug)]
298 struct StubError;
299
300 impl std::fmt::Display for StubError {
301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302 write!(f, "stub error")
303 }
304 }
305
306 impl std::error::Error for StubError {}
307
308 impl QueryExecutor for StubExecutor {
309 type Error = StubError;
310
311 fn fetch_all(&self, _query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
312 Ok(self.rows.clone())
313 }
314
315 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
316 Ok(self.affected)
317 }
318
319 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
320 Ok(GraphTransactionBoundary::Started)
321 }
322 }
323
324 impl QueryExecutor for QueueExecutor {
325 type Error = StubError;
326
327 fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
328 self.queries.lock().unwrap().push(query.sql.clone());
329 Ok(self.rows.lock().unwrap().pop_front().unwrap_or_default())
330 }
331
332 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
333 Ok(self.affected)
334 }
335 }
336
337 #[test]
338 fn user_context_records_configured_sql_logs() {
339 let mut ctx = UserContext::new()
340 .with_module(crate::module!(Order))
341 .with_sql_log_options(SqlLogOptions::select_only());
342 ctx.insert_resource(PostgresDialect);
343 ctx.insert_resource(StubExecutor {
344 affected: 1,
345 rows: Vec::new(),
346 });
347
348 {
349 let repo = ctx
350 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
351 .unwrap();
352 repo.fetch_all(&SelectQuery::new("Order").filter(Expr::eq("name", "Bob's Shop")))
353 .unwrap();
354 repo.insert(&InsertCommand::new("Order").value("name", "created"))
355 .unwrap();
356 }
357
358 let logs = ctx.sql_logs();
359 assert_eq!(logs.len(), 1);
360 assert_eq!(logs[0].operation, SqlLogOperation::Select);
361 assert_eq!(
362 logs[0].debug_sql,
363 format!(
364 "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"name\" = 'Bob''s Shop')"
365 )
366 );
367
368 ctx.set_sql_log_options(SqlLogOptions::mutation_only());
369 ctx.clear_sql_logs();
370 ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
371 .unwrap()
372 .update(
373 &UpdateCommand::new("Order", 1_u64)
374 .value("name", "updated")
375 .expected_version(1),
376 )
377 .unwrap();
378
379 let logs = ctx.sql_logs();
380 assert_eq!(logs.len(), 1);
381 assert_eq!(logs[0].operation, SqlLogOperation::Update);
382 assert!(logs[0].debug_sql.contains("UPDATE \"orders\" SET"));
383 assert!(logs[0].debug_sql.contains("'updated'"));
384 }
385
386 impl RepositoryBehavior for OrderBehavior {
387 fn before_select(
388 &self,
389 _ctx: &UserContext,
390 query: &mut teaql_core::SelectQuery,
391 ) -> Result<(), RuntimeError> {
392 query.filter = Some(Expr::eq("version", 1_i64));
393 Ok(())
394 }
395
396 fn before_insert(
397 &self,
398 _ctx: &UserContext,
399 command: &mut InsertCommand,
400 ) -> Result<(), RuntimeError> {
401 command
402 .values
403 .entry("version".to_owned())
404 .or_insert(Value::I64(1));
405 Ok(())
406 }
407
408 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
409 vec!["lines".to_owned()]
410 }
411 }
412
413 struct ContextAwareOrderBehavior;
414 struct OrderChecker;
415 #[derive(Clone)]
416 struct RecordingEventSink {
417 events: Arc<Mutex<Vec<EntityEvent>>>,
418 }
419
420 impl RepositoryBehavior for ContextAwareOrderBehavior {
421 fn before_insert(
422 &self,
423 ctx: &UserContext,
424 command: &mut InsertCommand,
425 ) -> Result<(), RuntimeError> {
426 let tenant = ctx
427 .get_named_resource::<String>("tenant")
428 .cloned()
429 .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
430 let version = *ctx
431 .get_named_resource::<i64>("initial_version")
432 .ok_or_else(|| {
433 RuntimeError::Behavior("missing initial_version resource".to_owned())
434 })?;
435 let trace_id = match ctx.local("trace_id") {
436 Some(Value::Text(value)) => value.clone(),
437 other => {
438 return Err(RuntimeError::Behavior(format!(
439 "missing trace_id local, got {other:?}"
440 )));
441 }
442 };
443
444 command
445 .values
446 .entry("name".to_owned())
447 .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
448 command
449 .values
450 .entry("version".to_owned())
451 .or_insert(Value::I64(version));
452 Ok(())
453 }
454 }
455
456 impl Checker for OrderChecker {
457 fn entity(&self) -> &str {
458 "Order"
459 }
460
461 fn check_and_fix(
462 &self,
463 _ctx: &UserContext,
464 record: &mut Record,
465 location: &ObjectLocation,
466 results: &mut CheckResults,
467 ) {
468 let status = CheckObjectStatus::from_record(record);
469 if status.is_create() {
470 self.required(record, "name", location, results);
471 record.entry("version".to_owned()).or_insert(Value::I64(1));
472 }
473 if status.is_update()
474 && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
475 {
476 record.insert(
477 "name".to_owned(),
478 Value::Text("graph-update-checked".to_owned()),
479 );
480 }
481 self.min_string_length(record, "name", 3, location, results);
482 }
483 }
484
485 impl EntityEventSink for RecordingEventSink {
486 fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
487 self.events.lock().unwrap().push(event.clone());
488 Ok(())
489 }
490 }
491
492 struct FixedIdGenerator(u64);
493
494 impl InternalIdGenerator for FixedIdGenerator {
495 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
496 Ok(self.0)
497 }
498 }
499
500 struct SequentialIdGenerator {
501 next: Mutex<u64>,
502 }
503
504 impl SequentialIdGenerator {
505 fn new(next: u64) -> Self {
506 Self {
507 next: Mutex::new(next),
508 }
509 }
510 }
511
512 impl InternalIdGenerator for SequentialIdGenerator {
513 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
514 let mut next = self
515 .next
516 .lock()
517 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
518 let id = *next;
519 *next += 1;
520 Ok(id)
521 }
522 }
523
524 #[test]
525 fn metadata_store_registers_entities() {
526 let store = InMemoryMetadataStore::new().with_entity(entity());
527 assert!(store.entity("Order").is_some());
528 }
529
530 #[test]
531 fn runtime_module_registers_descriptor_into_context() {
532 let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
533 assert!(ctx.entity("Order").is_some());
534 assert!(ctx.has_repository("Order"));
535 }
536
537 #[test]
538 fn runtime_module_registers_derived_entity_and_behavior() {
539 let ctx = UserContext::new().with_module(
540 RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
541 );
542 assert!(ctx.entity("CatalogProduct").is_some());
543 assert!(ctx.has_repository("CatalogProduct"));
544 assert!(ctx.repository_behavior("CatalogProduct").is_some());
545 }
546
547 #[test]
548 fn module_macro_registers_multiple_entities() {
549 let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
550 assert!(ctx.entity("CatalogProduct").is_some());
551 assert!(ctx.has_repository("CatalogProduct"));
552 }
553
554 #[test]
555 fn module_macro_registers_entity_behavior_pairs() {
556 let ctx =
557 UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
558 assert!(ctx.entity("CatalogProduct").is_some());
559 assert!(ctx.repository_behavior("CatalogProduct").is_some());
560 }
561
562 #[test]
563 fn repository_returns_optimistic_lock_conflict() {
564 let store = InMemoryMetadataStore::new().with_entity(entity());
565 let executor = StubExecutor {
566 affected: 0,
567 rows: Vec::new(),
568 };
569 let repo = Repository::new(&PostgresDialect, &store, &executor);
570
571 let err = repo
572 .update(
573 &UpdateCommand::new("Order", 1_u64)
574 .expected_version(3)
575 .value("name", "next"),
576 )
577 .unwrap_err();
578
579 match err {
580 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
581 other => panic!("unexpected error: {other}"),
582 }
583 }
584
585 #[test]
586 fn user_context_indexes_resources_and_locals() {
587 let mut ctx =
588 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
589 ctx.insert_resource::<u64>(42);
590 ctx.insert_named_resource("tenant", String::from("acme"));
591 ctx.put_local("trace_id", "req-1");
592
593 assert!(ctx.entity("Order").is_some());
594 assert_eq!(ctx.get_resource::<u64>(), Some(&42));
595 assert_eq!(
596 ctx.get_named_resource::<String>("tenant"),
597 Some(&String::from("acme"))
598 );
599 assert_eq!(
600 ctx.local("trace_id"),
601 Some(&Value::Text("req-1".to_owned()))
602 );
603 }
604
605 #[test]
606 fn user_context_builds_context_repository() {
607 let mut ctx =
608 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
609 ctx.insert_resource(PostgresDialect);
610 ctx.insert_resource(StubExecutor {
611 affected: 1,
612 rows: Vec::new(),
613 });
614
615 let repo = ctx.repository::<PostgresDialect, StubExecutor>().unwrap();
616 let affected = repo
617 .update(
618 &UpdateCommand::new("Order", 1_u64)
619 .expected_version(3)
620 .value("name", "next"),
621 )
622 .unwrap();
623
624 assert_eq!(affected, 1);
625 }
626
627 #[test]
628 fn user_context_resolves_repository_by_entity_type() {
629 let mut ctx = UserContext::new()
630 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
631 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
632 ctx.insert_resource(PostgresDialect);
633 ctx.insert_resource(StubExecutor {
634 affected: 1,
635 rows: Vec::new(),
636 });
637
638 let repo = ctx
639 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
640 .unwrap();
641 assert_eq!(repo.entity(), "Order");
642 assert_eq!(repo.select().entity, "Order");
643
644 let affected = repo
645 .insert(
646 &repo
647 .insert_command()
648 .value("id", 1_u64)
649 .value("version", 1_i64)
650 .value("name", "n"),
651 )
652 .unwrap();
653 assert_eq!(affected, 1);
654 }
655
656 #[test]
657 fn resolved_repository_applies_behavior_hooks() {
658 let mut ctx = UserContext::new()
659 .with_metadata(
660 InMemoryMetadataStore::new()
661 .with_entity(entity())
662 .with_entity(line_entity())
663 .with_entity(product_entity()),
664 )
665 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
666 .with_repository_behavior_registry(
667 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
668 );
669 ctx.insert_resource(PostgresDialect);
670 ctx.insert_resource(StubExecutor {
671 affected: 1,
672 rows: Vec::new(),
673 });
674
675 let repo = ctx
676 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
677 .unwrap();
678
679 let compiled = repo.compile(&repo.select()).unwrap();
680 assert!(compiled.sql.contains("WHERE (\"version\" = $1)"));
681
682 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
683 let affected = repo.insert(&insert).unwrap();
684 assert_eq!(affected, 1);
685 assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
686 }
687
688 #[test]
689 fn resolved_repository_prepares_insert_command_with_generated_id() {
690 let mut ctx = UserContext::new()
691 .with_metadata(
692 InMemoryMetadataStore::new()
693 .with_entity(entity())
694 .with_entity(line_entity())
695 .with_entity(product_entity()),
696 )
697 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
698 .with_repository_behavior_registry(
699 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
700 )
701 .with_internal_id_generator(FixedIdGenerator(42));
702 ctx.insert_resource(PostgresDialect);
703 ctx.insert_resource(StubExecutor {
704 affected: 1,
705 rows: Vec::new(),
706 });
707
708 let repo = ctx
709 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
710 .unwrap();
711
712 let prepared = repo
713 .prepare_insert_command(&repo.insert_command().value("name", "n"))
714 .unwrap();
715
716 assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
717 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
718 assert_eq!(
719 prepared.values.get("name"),
720 Some(&Value::Text("n".to_owned()))
721 );
722 }
723
724 #[test]
725 fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
726 let mut ctx = UserContext::new()
727 .with_metadata(
728 InMemoryMetadataStore::new()
729 .with_entity(entity())
730 .with_entity(line_entity())
731 .with_entity(product_entity()),
732 )
733 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
734 .with_repository_behavior_registry(
735 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
736 )
737 .with_internal_id_generator(SequentialIdGenerator::new(500));
738 ctx.insert_resource(PostgresDialect);
739 ctx.insert_resource(StubExecutor {
740 affected: 1,
741 rows: Vec::new(),
742 });
743
744 let repo = ctx
745 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
746 .unwrap();
747 let graph = GraphNode::new("Order").value("name", "root").relation(
748 "lines",
749 GraphNode::new("OrderLine")
750 .value("name", "line-1")
751 .relation("product", GraphNode::new("Product").value("name", "sku-1")),
752 );
753
754 let saved = repo.save_graph(graph).unwrap();
755
756 assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
757 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
758 let lines = saved.relations.get("lines").unwrap();
759 assert_eq!(lines.len(), 1);
760 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(501)));
761 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
762 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(502)));
763 let product = lines[0].relations.get("product").unwrap();
764 assert_eq!(product[0].values.get("id"), Some(&Value::U64(502)));
765 }
766
767 #[test]
768 fn resolved_repository_extracts_and_saves_typed_entity_graph() {
769 let mut ctx = UserContext::new()
770 .with_metadata(
771 InMemoryMetadataStore::new()
772 .with_entity(entity())
773 .with_entity(line_entity())
774 .with_entity(product_entity()),
775 )
776 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
777 .with_internal_id_generator(SequentialIdGenerator::new(700));
778 ctx.insert_resource(PostgresDialect);
779 ctx.insert_resource(StubExecutor {
780 affected: 1,
781 rows: Vec::new(),
782 });
783
784 let repo = ctx
785 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
786 .unwrap();
787 let order = TypedGraphOrder {
788 id: None,
789 version: 1,
790 name: "typed-root".to_owned(),
791 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
792 id: None,
793 order_id: None,
794 name: "typed-line".to_owned(),
795 product_id: None,
796 product: Some(TypedGraphProduct {
797 id: None,
798 name: "typed-product".to_owned(),
799 }),
800 }]),
801 };
802
803 let extracted = repo.graph_node_from_entity(order).unwrap();
804 assert_eq!(extracted.entity, "Order");
805 assert_eq!(
806 extracted.values.get("name"),
807 Some(&Value::Text("typed-root".to_owned()))
808 );
809 assert_eq!(extracted.values.get("id"), Some(&Value::Null));
810 assert_eq!(extracted.relations["lines"].len(), 1);
811 assert_eq!(
812 extracted.relations["lines"][0].values.get("name"),
813 Some(&Value::Text("typed-line".to_owned()))
814 );
815 assert_eq!(
816 extracted.relations["lines"][0].relations["product"].len(),
817 1
818 );
819
820 let saved = repo.save_graph(extracted).unwrap();
821 assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
822 let lines = saved.relations.get("lines").unwrap();
823 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(701)));
824 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
825 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(702)));
826 assert_eq!(
827 lines[0].relations["product"][0].values.get("id"),
828 Some(&Value::U64(702))
829 );
830 }
831
832 #[test]
833 fn resolved_repository_saves_typed_entity_graph_directly() {
834 let mut ctx = UserContext::new()
835 .with_metadata(
836 InMemoryMetadataStore::new()
837 .with_entity(entity())
838 .with_entity(line_entity())
839 .with_entity(product_entity()),
840 )
841 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
842 .with_internal_id_generator(SequentialIdGenerator::new(800));
843 ctx.insert_resource(PostgresDialect);
844 ctx.insert_resource(StubExecutor {
845 affected: 1,
846 rows: Vec::new(),
847 });
848
849 let repo = ctx
850 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
851 .unwrap();
852 let saved = repo
853 .save_entity_graph(TypedGraphOrder {
854 id: None,
855 version: 1,
856 name: "typed-direct".to_owned(),
857 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
858 id: None,
859 order_id: None,
860 name: "typed-line".to_owned(),
861 product_id: None,
862 product: Some(TypedGraphProduct {
863 id: None,
864 name: "typed-product".to_owned(),
865 }),
866 }]),
867 })
868 .unwrap();
869
870 assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
871 assert_eq!(
872 saved.relations["lines"][0].values.get("order_id"),
873 Some(&Value::U64(800))
874 );
875 assert_eq!(
876 saved.relations["lines"][0].values.get("product_id"),
877 Some(&Value::U64(802))
878 );
879 }
880
881 #[test]
882 fn custom_user_context_can_drive_insert_preparation() {
883 let mut ctx = UserContext::new()
884 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
885 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
886 .with_repository_behavior_registry(
887 InMemoryRepositoryBehaviorRegistry::new()
888 .with_behavior("Order", ContextAwareOrderBehavior),
889 )
890 .with_internal_id_generator(FixedIdGenerator(99));
891 ctx.insert_named_resource("tenant", String::from("acme"));
892 ctx.insert_named_resource("initial_version", 7_i64);
893 ctx.put_local("trace_id", "req-9");
894 ctx.insert_resource(PostgresDialect);
895 ctx.insert_resource(StubExecutor {
896 affected: 1,
897 rows: Vec::new(),
898 });
899
900 let repo = ctx
901 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
902 .unwrap();
903 let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
904
905 assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
906 assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
907 assert_eq!(
908 prepared.values.get("name"),
909 Some(&Value::Text("acme:req-9".to_owned()))
910 );
911 }
912
913 #[test]
914 fn checker_registry_validates_and_fixes_insert_commands() {
915 let mut ctx = UserContext::new()
916 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
917 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
918 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
919 .with_internal_id_generator(FixedIdGenerator(77));
920 ctx.insert_resource(PostgresDialect);
921 ctx.insert_resource(StubExecutor {
922 affected: 1,
923 rows: Vec::new(),
924 });
925
926 let repo = ctx
927 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
928 .unwrap();
929 let prepared = repo
930 .prepare_insert_command(&repo.insert_command().value("name", "valid"))
931 .unwrap();
932
933 assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
934 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
935 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
936
937 let error = repo
938 .prepare_insert_command(&repo.insert_command().value("name", "no"))
939 .unwrap_err();
940 match error {
941 RuntimeError::Check(results) => {
942 assert_eq!(results.len(), 1);
943 assert_eq!(results[0].location.to_string(), "name");
944 }
945 other => panic!("unexpected checker error: {other:?}"),
946 }
947 }
948
949 #[test]
950 fn checker_registry_validates_update_commands_without_required_insert_checks() {
951 let mut ctx = UserContext::new()
952 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
953 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
954 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
955 ctx.insert_resource(PostgresDialect);
956 ctx.insert_resource(StubExecutor {
957 affected: 1,
958 rows: Vec::new(),
959 });
960
961 let repo = ctx
962 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
963 .unwrap();
964 repo.update(&repo.update_command(1_u64).value("version", 1_i64))
965 .unwrap();
966
967 let error = repo
968 .update(&repo.update_command(1_u64).value("name", "no"))
969 .unwrap_err();
970 match error {
971 RepositoryError::Runtime(RuntimeError::Check(results)) => {
972 assert_eq!(results.len(), 1);
973 assert_eq!(results[0].location.to_string(), "name");
974 }
975 other => panic!("unexpected checker error: {other:?}"),
976 }
977 }
978
979 #[test]
980 fn built_in_language_translators_cover_fifteen_languages() {
981 assert_eq!(Language::ALL.len(), 15);
982 let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
983 let messages = Language::ALL
984 .iter()
985 .map(|language| translate_check_result(*language, &result))
986 .collect::<Vec<_>>();
987
988 assert!(messages.iter().all(|message| !message.is_empty()));
989 assert!(messages.iter().any(|message| message.contains("required")));
990 assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
991 assert!(
992 messages
993 .iter()
994 .any(|message| message.contains("obligatoire"))
995 );
996 assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
997 assert_eq!(
998 Language::from_code("zh-TW"),
999 Some(Language::TraditionalChinese)
1000 );
1001 }
1002
1003 #[test]
1004 fn user_context_language_switch_translates_checker_errors() {
1005 let mut ctx = UserContext::new()
1006 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1007 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1008 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1009 .with_internal_id_generator(FixedIdGenerator(77))
1010 .with_language(Language::Chinese);
1011 ctx.insert_resource(PostgresDialect);
1012 ctx.insert_resource(StubExecutor {
1013 affected: 1,
1014 rows: Vec::new(),
1015 });
1016
1017 let repo = ctx
1018 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1019 .unwrap();
1020 let error = repo
1021 .prepare_insert_command(&repo.insert_command())
1022 .unwrap_err();
1023 match error {
1024 RuntimeError::Check(results) => {
1025 assert_eq!(results.len(), 1);
1026 assert!(
1027 results[0]
1028 .message
1029 .as_ref()
1030 .is_some_and(|message| message.contains("å¿…å¡«"))
1031 );
1032 }
1033 other => panic!("unexpected checker error: {other:?}"),
1034 }
1035
1036 let mut ctx = UserContext::new().with_language(Language::English);
1037 ctx.set_language_code("es").unwrap();
1038 assert_eq!(ctx.language(), Language::Spanish);
1039 }
1040
1041 #[test]
1042 fn checker_registry_merges_graph_update_fixes_by_object_status() {
1043 let mut ctx = UserContext::new()
1044 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1045 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1046 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1047 ctx.insert_resource(PostgresDialect);
1048 ctx.insert_resource(StubExecutor {
1049 affected: 1,
1050 rows: vec![Record::from([
1051 ("id".to_owned(), Value::U64(1)),
1052 ("version".to_owned(), Value::I64(1)),
1053 ("name".to_owned(), Value::Text("old".to_owned())),
1054 ])],
1055 });
1056
1057 let repo = ctx
1058 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1059 .unwrap();
1060 let saved = repo
1061 .save_graph(
1062 GraphNode::new("Order")
1063 .value("id", 1_u64)
1064 .value("version", 1_i64)
1065 .value("name", "graph-update"),
1066 )
1067 .unwrap();
1068
1069 assert_eq!(
1070 saved.values.get("name"),
1071 Some(&Value::Text("graph-update-checked".to_owned()))
1072 );
1073 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1074 assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1075 }
1076
1077 #[test]
1078 fn user_context_event_sink_receives_repository_mutation_events() {
1079 let events = Arc::new(Mutex::new(Vec::new()));
1080 let mut ctx = UserContext::new()
1081 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1082 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1083 .with_internal_id_generator(FixedIdGenerator(88))
1084 .with_event_sink(RecordingEventSink {
1085 events: events.clone(),
1086 });
1087 ctx.insert_resource(PostgresDialect);
1088 ctx.insert_resource(StubExecutor {
1089 affected: 1,
1090 rows: Vec::new(),
1091 });
1092
1093 let repo = ctx
1094 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1095 .unwrap();
1096 repo.insert(&repo.insert_command().value("name", "created"))
1097 .unwrap();
1098 repo.update(
1099 &repo
1100 .update_command(88_u64)
1101 .expected_version(1)
1102 .value("name", "updated"),
1103 )
1104 .unwrap();
1105 repo.delete(&repo.delete_command(88_u64).expected_version(2))
1106 .unwrap();
1107 repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1108
1109 let events = events.lock().unwrap();
1110 assert_eq!(events.len(), 4);
1111 assert_eq!(events[0].kind, EntityEventKind::Created);
1112 assert_eq!(events[0].entity, "Order");
1113 assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1114 assert_eq!(events[1].kind, EntityEventKind::Updated);
1115 assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1116 assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1117 assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1118 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1119 assert_eq!(events[3].kind, EntityEventKind::Recovered);
1120 }
1121
1122 #[test]
1123 fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1124 let events = Arc::new(Mutex::new(Vec::new()));
1125 let mut ctx = UserContext::new()
1126 .with_metadata(
1127 InMemoryMetadataStore::new()
1128 .with_entity(entity())
1129 .with_entity(line_entity())
1130 .with_entity(product_entity()),
1131 )
1132 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1133 .with_event_sink(RecordingEventSink {
1134 events: events.clone(),
1135 });
1136 ctx.insert_resource(PostgresDialect);
1137 ctx.insert_resource(StubExecutor {
1138 affected: 1,
1139 rows: vec![Record::from([
1140 ("id".to_owned(), Value::U64(1)),
1141 ("version".to_owned(), Value::I64(1)),
1142 ("name".to_owned(), Value::Text("old".to_owned())),
1143 ])],
1144 });
1145
1146 let repo = ctx
1147 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1148 .unwrap();
1149 repo.save_graph(
1150 GraphNode::new("Order")
1151 .value("id", 1_u64)
1152 .value("version", 1_i64)
1153 .value("name", "updated")
1154 .relation(
1155 "lines",
1156 GraphNode::new("OrderLine")
1157 .value("name", "line")
1158 .value("product_id", 3_u64),
1159 ),
1160 )
1161 .unwrap();
1162
1163 let events = events.lock().unwrap();
1164 assert_eq!(events.len(), 3);
1165 assert_eq!(events[0].kind, EntityEventKind::Updated);
1166 assert_eq!(events[0].entity, "Order");
1167 assert_eq!(events[1].kind, EntityEventKind::Updated);
1168 assert_eq!(events[1].entity, "OrderLine");
1169 assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1170 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1171 assert_eq!(events[2].entity, "OrderLine");
1172 }
1173
1174 #[test]
1175 fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1176 let mut ctx = UserContext::new()
1177 .with_metadata(
1178 InMemoryMetadataStore::new()
1179 .with_entity(entity())
1180 .with_entity(line_entity())
1181 .with_entity(product_entity()),
1182 )
1183 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1184 .with_internal_id_generator(SequentialIdGenerator::new(500));
1185 ctx.insert_resource(PostgresDialect);
1186 ctx.insert_resource(StubExecutor {
1187 affected: 1,
1188 rows: vec![Record::from([
1189 ("id".to_owned(), Value::U64(1)),
1190 ("version".to_owned(), Value::I64(1)),
1191 ("name".to_owned(), Value::Text("old".to_owned())),
1192 ])],
1193 });
1194
1195 let plan = ctx
1196 .plan_for_save_graph::<PostgresDialect, StubExecutor>(
1197 GraphNode::new("Order")
1198 .value("id", 1_u64)
1199 .value("version", 1_i64)
1200 .value("name", "updated")
1201 .relation(
1202 "lines",
1203 GraphNode::new("OrderLine")
1204 .value("name", "new-line-a")
1205 .value("product_id", 2_u64),
1206 )
1207 .relation(
1208 "lines",
1209 GraphNode::new("OrderLine")
1210 .value("name", "new-line-b")
1211 .value("product_id", 2_u64),
1212 )
1213 .relation(
1214 "lines",
1215 GraphNode::new("OrderLine")
1216 .value("id", 5_u64)
1217 .value("version", 1_i64)
1218 .value("name", "same-update-a"),
1219 )
1220 .relation(
1221 "lines",
1222 GraphNode::new("OrderLine")
1223 .value("id", 6_u64)
1224 .value("version", 1_i64)
1225 .value("name", "same-update-b"),
1226 )
1227 .relation(
1228 "lines",
1229 GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1230 )
1231 .relation(
1232 "lines",
1233 GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1234 ),
1235 )
1236 .unwrap();
1237 let counts = plan.grouped_counts();
1238
1239 assert_eq!(
1240 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1241 Some(&1)
1242 );
1243 assert_eq!(
1244 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1245 Some(&2)
1246 );
1247 assert_eq!(
1248 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1249 Some(&2)
1250 );
1251 assert_eq!(
1252 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1253 Some(&1)
1254 );
1255 assert_eq!(
1256 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1257 Some(&1)
1258 );
1259 let create_batch = plan
1260 .batches
1261 .iter()
1262 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1263 .unwrap();
1264 assert_eq!(create_batch.items.len(), 2);
1265 assert_eq!(
1266 create_batch.items[0].values.get("id"),
1267 Some(&Value::U64(500))
1268 );
1269 assert_eq!(
1270 create_batch.items[1].values.get("id"),
1271 Some(&Value::U64(501))
1272 );
1273 let update_batch = plan
1274 .batches
1275 .iter()
1276 .find(|batch| {
1277 batch.entity == "OrderLine"
1278 && batch.kind == GraphMutationKind::Update
1279 && batch.update_fields == vec!["name".to_owned()]
1280 })
1281 .unwrap();
1282 assert_eq!(update_batch.items.len(), 2);
1283 }
1284
1285 #[test]
1286 fn resolved_repository_builds_relation_plans() {
1287 let mut ctx = UserContext::new()
1288 .with_metadata(
1289 InMemoryMetadataStore::new()
1290 .with_entity(entity())
1291 .with_entity(line_entity())
1292 .with_entity(product_entity()),
1293 )
1294 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1295 .with_repository_behavior_registry(
1296 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1297 );
1298 ctx.insert_resource(PostgresDialect);
1299 ctx.insert_resource(StubExecutor {
1300 affected: 1,
1301 rows: Vec::new(),
1302 });
1303
1304 let repo = ctx
1305 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1306 .unwrap();
1307 let plans = repo.relation_plans().unwrap();
1308
1309 assert_eq!(plans.len(), 1);
1310 assert_eq!(plans[0].relation_name, "lines");
1311 assert_eq!(plans[0].target_entity, "OrderLine");
1312 assert_eq!(plans[0].local_key, "id");
1313 assert_eq!(plans[0].foreign_key, "order_id");
1314 assert!(plans[0].many);
1315 }
1316
1317 #[test]
1318 fn resolved_repository_builds_relation_query_from_parent_rows() {
1319 let mut ctx = UserContext::new()
1320 .with_metadata(
1321 InMemoryMetadataStore::new()
1322 .with_entity(entity())
1323 .with_entity(line_entity())
1324 .with_entity(product_entity()),
1325 )
1326 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1327 .with_repository_behavior_registry(
1328 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1329 );
1330 ctx.insert_resource(PostgresDialect);
1331 ctx.insert_resource(StubExecutor {
1332 affected: 1,
1333 rows: Vec::new(),
1334 });
1335
1336 let repo = ctx
1337 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1338 .unwrap();
1339 let parent_rows = vec![
1340 Record::from([(String::from("id"), Value::U64(11))]),
1341 Record::from([(String::from("id"), Value::U64(12))]),
1342 ];
1343
1344 let query = repo.relation_query("lines", &parent_rows).unwrap();
1345 let compiled = repo.compile(&query).unwrap();
1346 assert!(compiled.sql.contains("FROM \"orderline\""));
1347 assert!(compiled.sql.contains("\"order_id\" IN ($1, $2)"));
1348 assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1349 }
1350
1351 #[test]
1352 fn resolved_repository_enhances_parent_rows_with_relations() {
1353 let mut ctx = UserContext::new()
1354 .with_metadata(
1355 InMemoryMetadataStore::new()
1356 .with_entity(entity())
1357 .with_entity(line_entity())
1358 .with_entity(product_entity()),
1359 )
1360 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1361 .with_repository_behavior_registry(
1362 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1363 );
1364 ctx.insert_resource(PostgresDialect);
1365 ctx.insert_resource(StubExecutor {
1366 affected: 1,
1367 rows: vec![
1368 Record::from([
1369 (String::from("id"), Value::U64(101)),
1370 (String::from("order_id"), Value::U64(11)),
1371 (String::from("name"), Value::Text(String::from("l1"))),
1372 ]),
1373 Record::from([
1374 (String::from("id"), Value::U64(102)),
1375 (String::from("order_id"), Value::U64(11)),
1376 (String::from("name"), Value::Text(String::from("l2"))),
1377 ]),
1378 Record::from([
1379 (String::from("id"), Value::U64(201)),
1380 (String::from("order_id"), Value::U64(12)),
1381 (String::from("name"), Value::Text(String::from("l3"))),
1382 ]),
1383 ],
1384 });
1385
1386 let repo = ctx
1387 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1388 .unwrap();
1389 let mut parents = vec![
1390 Record::from([(String::from("id"), Value::U64(11))]),
1391 Record::from([(String::from("id"), Value::U64(12))]),
1392 ];
1393
1394 repo.enhance_relations(&mut parents).unwrap();
1395
1396 match parents[0].get("lines") {
1397 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1398 other => panic!("unexpected lines payload: {other:?}"),
1399 }
1400 match parents[1].get("lines") {
1401 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1402 other => panic!("unexpected lines payload: {other:?}"),
1403 }
1404 }
1405
1406 #[test]
1407 fn resolved_repository_fetches_smart_list_of_entities() {
1408 let mut ctx = UserContext::new()
1409 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1410 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1411 ctx.insert_resource(PostgresDialect);
1412 ctx.insert_resource(StubExecutor {
1413 affected: 1,
1414 rows: vec![Record::from([
1415 (String::from("id"), Value::U64(7)),
1416 (String::from("version"), Value::I64(2)),
1417 (String::from("name"), Value::Text(String::from("typed"))),
1418 ])],
1419 });
1420
1421 let repo = ctx
1422 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1423 .unwrap();
1424 let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1425
1426 assert_eq!(rows.len(), 1);
1427 assert_eq!(
1428 rows.first(),
1429 Some(&OrderEntity {
1430 id: 7,
1431 version: 2,
1432 name: String::from("typed"),
1433 })
1434 );
1435 }
1436
1437 #[test]
1438 fn resolved_repository_fetches_smart_list_of_derived_entities() {
1439 let mut ctx = UserContext::new()
1440 .with_metadata(
1441 InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1442 )
1443 .with_repository_registry(
1444 InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1445 );
1446 ctx.insert_resource(PostgresDialect);
1447 ctx.insert_resource(StubExecutor {
1448 affected: 1,
1449 rows: vec![Record::from([
1450 (String::from("id"), Value::U64(9)),
1451 (String::from("name"), Value::Text(String::from("derived"))),
1452 ])],
1453 });
1454
1455 let repo = ctx
1456 .resolve_repository::<PostgresDialect, StubExecutor>("CatalogProduct")
1457 .unwrap();
1458 let rows = repo
1459 .fetch_entities::<CatalogProductRow>(&repo.select())
1460 .unwrap();
1461
1462 assert_eq!(rows.len(), 1);
1463 assert_eq!(
1464 rows.first(),
1465 Some(&CatalogProductRow {
1466 id: 9,
1467 name: String::from("derived"),
1468 })
1469 );
1470 }
1471
1472 #[test]
1473 fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1474 let mut ctx = UserContext::new()
1475 .with_metadata(
1476 InMemoryMetadataStore::new()
1477 .with_entity(OrderAggregateDynamic::entity_descriptor()),
1478 )
1479 .with_repository_registry(
1480 InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1481 );
1482 ctx.insert_resource(PostgresDialect);
1483 ctx.insert_resource(StubExecutor {
1484 affected: 1,
1485 rows: vec![Record::from([
1486 (String::from("id"), Value::U64(1)),
1487 (String::from("lineCount"), Value::I64(3)),
1488 (String::from("amount"), Value::F64(18.5)),
1489 ])],
1490 });
1491
1492 let repo = ctx
1493 .resolve_repository::<PostgresDialect, StubExecutor>("OrderAggregate")
1494 .unwrap();
1495 let rows = repo
1496 .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1497 .unwrap();
1498
1499 assert_eq!(rows.len(), 1);
1500 assert_eq!(rows.data[0].id, 1);
1501 assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1502 assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1503 assert_eq!(
1504 rows.into_vec().into_iter().next().unwrap().into_json(),
1505 serde_json::json!({
1506 "id": 1,
1507 "lineCount": 3,
1508 "amount": 18.5
1509 })
1510 );
1511 }
1512
1513 #[test]
1514 fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1515 let executor = QueueExecutor {
1516 affected: 1,
1517 rows: Mutex::new(VecDeque::from([
1518 vec![
1519 Record::from([
1520 (String::from("id"), Value::U64(1)),
1521 (String::from("version"), Value::I64(1)),
1522 (String::from("name"), Value::Text(String::from("first"))),
1523 ]),
1524 Record::from([
1525 (String::from("id"), Value::U64(2)),
1526 (String::from("version"), Value::I64(1)),
1527 (String::from("name"), Value::Text(String::from("second"))),
1528 ]),
1529 ],
1530 vec![Record::from([
1531 (String::from("order_id"), Value::U64(1)),
1532 (String::from("lineCount"), Value::I64(3)),
1533 ])],
1534 ])),
1535 queries: Mutex::new(Vec::new()),
1536 };
1537 let mut ctx = UserContext::new()
1538 .with_metadata(
1539 InMemoryMetadataStore::new()
1540 .with_entity(entity())
1541 .with_entity(line_entity()),
1542 )
1543 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1544 ctx.insert_resource(PostgresDialect);
1545 ctx.insert_resource(executor);
1546
1547 let repo = ctx
1548 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1549 .unwrap();
1550 let rows = repo
1551 .fetch_all_with_relation_aggregates(
1552 &repo
1553 .select()
1554 .project("id")
1555 .project("version")
1556 .project("name"),
1557 &[RelationAggregate::new(
1558 "lines",
1559 "lineCount",
1560 SelectQuery::new("OrderLine"),
1561 true,
1562 )],
1563 )
1564 .unwrap();
1565
1566 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1567 assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1568
1569 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1570 let queries = executor.queries.lock().unwrap();
1571 assert_eq!(queries.len(), 2);
1572 assert_eq!(
1573 queries[1],
1574 "SELECT \"order_id\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_id\" IN ($1, $2)) GROUP BY \"order_id\""
1575 );
1576 }
1577
1578 #[test]
1579 fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
1580 let executor = QueueExecutor {
1581 affected: 1,
1582 rows: Mutex::new(VecDeque::from([vec![Record::from([(
1583 String::from("count"),
1584 Value::I64(2),
1585 )])]])),
1586 queries: Mutex::new(Vec::new()),
1587 };
1588 let mut ctx = UserContext::new()
1589 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1590 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1591 ctx.insert_resource(PostgresDialect);
1592 ctx.insert_resource(executor);
1593 ctx.insert_resource(InMemoryAggregationCache::default());
1594
1595 let repo = ctx
1596 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1597 .unwrap();
1598 let query = repo
1599 .select()
1600 .count("count")
1601 .enable_aggregation_cache_for(60_000);
1602
1603 let first = repo.fetch_all(&query).unwrap();
1604 let second = repo.fetch_all(&query).unwrap();
1605
1606 assert_eq!(first, second);
1607 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1608 assert_eq!(executor.queries.lock().unwrap().len(), 1);
1609 }
1610
1611 #[test]
1612 fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
1613 let executor = QueueExecutor {
1614 affected: 1,
1615 rows: Mutex::new(VecDeque::from([
1616 vec![Record::from([(String::from("count"), Value::I64(2))])],
1617 vec![Record::from([(String::from("count"), Value::I64(3))])],
1618 ])),
1619 queries: Mutex::new(Vec::new()),
1620 };
1621 let mut ctx = UserContext::new()
1622 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1623 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1624 ctx.insert_resource(PostgresDialect);
1625 ctx.insert_resource(executor);
1626 ctx.insert_resource(
1627 Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
1628 as Arc<dyn AggregationCacheBackend>,
1629 );
1630
1631 let repo = ctx
1632 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1633 .unwrap();
1634 let query = repo
1635 .select()
1636 .count("count")
1637 .enable_aggregation_cache_for(60_000);
1638
1639 let first = repo.fetch_all(&query).unwrap();
1640 let cached = repo.fetch_all(&query).unwrap();
1641 repo.insert(
1642 &InsertCommand::new("Order")
1643 .value("id", 9_u64)
1644 .value("version", 1_i64)
1645 .value("name", "new"),
1646 )
1647 .unwrap();
1648 let refreshed = repo.fetch_all(&query).unwrap();
1649
1650 assert_eq!(first, cached);
1651 assert_ne!(cached, refreshed);
1652 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1653 assert_eq!(executor.queries.lock().unwrap().len(), 2);
1654 }
1655
1656 #[test]
1657 fn aggregation_cache_propagates_to_relation_aggregates() {
1658 let parent_rows = vec![
1659 Record::from([
1660 (String::from("id"), Value::U64(1)),
1661 (String::from("version"), Value::I64(1)),
1662 (String::from("name"), Value::Text(String::from("first"))),
1663 ]),
1664 Record::from([
1665 (String::from("id"), Value::U64(2)),
1666 (String::from("version"), Value::I64(1)),
1667 (String::from("name"), Value::Text(String::from("second"))),
1668 ]),
1669 ];
1670 let aggregate_rows = vec![Record::from([
1671 (String::from("order_id"), Value::U64(1)),
1672 (String::from("lineCount"), Value::I64(3)),
1673 ])];
1674 let executor = QueueExecutor {
1675 affected: 1,
1676 rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
1677 queries: Mutex::new(Vec::new()),
1678 };
1679 let mut ctx = UserContext::new()
1680 .with_metadata(
1681 InMemoryMetadataStore::new()
1682 .with_entity(entity())
1683 .with_entity(line_entity()),
1684 )
1685 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1686 ctx.insert_resource(PostgresDialect);
1687 ctx.insert_resource(executor);
1688 ctx.insert_resource(InMemoryAggregationCache::default());
1689
1690 let repo = ctx
1691 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1692 .unwrap();
1693 let query = repo
1694 .select()
1695 .project("id")
1696 .project("version")
1697 .project("name")
1698 .enable_aggregation_cache_for(60_000)
1699 .propagate_aggregation_cache(60_000);
1700 let aggregate =
1701 RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
1702
1703 let first = repo
1704 .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
1705 .unwrap();
1706 let second = repo
1707 .fetch_all_with_relation_aggregates(&query, &[aggregate])
1708 .unwrap();
1709
1710 assert_eq!(first, second);
1711 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1712 assert_eq!(executor.queries.lock().unwrap().len(), 2);
1713 }
1714
1715 #[test]
1716 fn memory_repository_fetches_smart_list_entities_with_query_features() {
1717 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1718 let repository = MemoryRepository::new(metadata).with_rows(
1719 "Order",
1720 vec![
1721 Record::from([
1722 (String::from("id"), Value::U64(1)),
1723 (String::from("version"), Value::I64(1)),
1724 (String::from("name"), Value::Text(String::from("alpha"))),
1725 ]),
1726 Record::from([
1727 (String::from("id"), Value::U64(2)),
1728 (String::from("version"), Value::I64(1)),
1729 (String::from("name"), Value::Text(String::from("beta"))),
1730 ]),
1731 Record::from([
1732 (String::from("id"), Value::U64(3)),
1733 (String::from("version"), Value::I64(1)),
1734 (String::from("name"), Value::Text(String::from("gamma"))),
1735 ]),
1736 ],
1737 );
1738
1739 let query = teaql_core::SelectQuery::new("Order")
1740 .filter(Expr::Binary {
1741 left: Box::new(Expr::column("id")),
1742 op: teaql_core::BinaryOp::Gte,
1743 right: Box::new(Expr::value(2_u64)),
1744 })
1745 .order_by(OrderBy::desc("id"))
1746 .limit(1);
1747
1748 let orders = repository.fetch_entities::<Order>(&query).unwrap();
1749
1750 assert_eq!(orders.ids(), vec![Value::U64(3)]);
1751 assert_eq!(orders.versions(), vec![1]);
1752 assert_eq!(orders.first().unwrap().name, "gamma");
1753 }
1754
1755 #[test]
1756 fn memory_repository_runs_aggregates() {
1757 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1758 let repository = MemoryRepository::new(metadata).with_rows(
1759 "Order",
1760 vec![
1761 Record::from([
1762 (String::from("id"), Value::U64(1)),
1763 (String::from("version"), Value::I64(1)),
1764 (String::from("name"), Value::Text(String::from("alpha"))),
1765 ]),
1766 Record::from([
1767 (String::from("id"), Value::U64(2)),
1768 (String::from("version"), Value::I64(2)),
1769 (String::from("name"), Value::Text(String::from("beta"))),
1770 ]),
1771 ],
1772 );
1773
1774 let query = teaql_core::SelectQuery {
1775 entity: String::from("Order"),
1776 projection: Vec::new(),
1777 expr_projection: Vec::new(),
1778 filter: None,
1779 having: None,
1780 order_by: Vec::new(),
1781 slice: None,
1782 aggregates: vec![
1783 Aggregate {
1784 function: AggregateFunction::Count,
1785 field: String::from("id"),
1786 alias: String::from("count"),
1787 },
1788 Aggregate {
1789 function: AggregateFunction::Sum,
1790 field: String::from("version"),
1791 alias: String::from("versionSum"),
1792 },
1793 ],
1794 group_by: Vec::new(),
1795 relations: Vec::new(),
1796 aggregation_cache: None,
1797 comment: None,
1798 raw_sql: None,
1799 raw_sql_search_criteria: Vec::new(),
1800 json_expr: None,
1801 dynamic_properties: Vec::new(),
1802 raw_projections: Vec::new(),
1803 object_group_bys: Vec::new(),
1804 child_enhancements: Vec::new(),
1805 };
1806
1807 let rows = repository.fetch_all(&query).unwrap();
1808
1809 assert_eq!(rows.len(), 1);
1810 assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
1811 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1812 }
1813
1814 #[test]
1815 fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
1816 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1817 let repository = MemoryRepository::new(metadata).with_rows(
1818 "Order",
1819 vec![
1820 Record::from([
1821 (String::from("id"), Value::U64(1)),
1822 (String::from("version"), Value::I64(1)),
1823 (String::from("name"), Value::Text(String::from("alpha"))),
1824 ]),
1825 Record::from([
1826 (String::from("id"), Value::U64(2)),
1827 (String::from("version"), Value::I64(2)),
1828 (String::from("name"), Value::Text(String::from("alpha"))),
1829 ]),
1830 Record::from([
1831 (String::from("id"), Value::U64(3)),
1832 (String::from("version"), Value::I64(3)),
1833 (String::from("name"), Value::Text(String::from("tmp-beta"))),
1834 ]),
1835 ],
1836 );
1837
1838 let rows = repository
1839 .fetch_all(
1840 &teaql_core::SelectQuery::new("Order")
1841 .filter(
1842 Expr::between("version", 1_i64, 3_i64)
1843 .and_expr(Expr::not_like("name", "tmp%"))
1844 .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
1845 )
1846 .group_by("name")
1847 .count("total")
1848 .sum("version", "versionSum"),
1849 )
1850 .unwrap();
1851
1852 assert_eq!(rows.len(), 1);
1853 assert_eq!(
1854 rows[0].get("name"),
1855 Some(&Value::Text(String::from("alpha")))
1856 );
1857 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1858 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1859 }
1860
1861 #[test]
1862 fn memory_repository_runs_extended_aggregates_and_having() {
1863 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1864 let repository = MemoryRepository::new(metadata).with_rows(
1865 "Order",
1866 vec![
1867 Record::from([
1868 (String::from("id"), Value::U64(1)),
1869 (String::from("version"), Value::I64(1)),
1870 (String::from("name"), Value::Text(String::from("alpha"))),
1871 ]),
1872 Record::from([
1873 (String::from("id"), Value::U64(2)),
1874 (String::from("version"), Value::I64(3)),
1875 (String::from("name"), Value::Text(String::from("alpha"))),
1876 ]),
1877 Record::from([
1878 (String::from("id"), Value::U64(3)),
1879 (String::from("version"), Value::I64(7)),
1880 (String::from("name"), Value::Text(String::from("beta"))),
1881 ]),
1882 ],
1883 );
1884
1885 let rows = repository
1886 .fetch_all(
1887 &teaql_core::SelectQuery::new("Order")
1888 .group_by("name")
1889 .count("total")
1890 .stddev("version", "stddevVersion")
1891 .var_pop("version", "varPopVersion")
1892 .bit_or("version", "bitOrVersion")
1893 .having(Expr::gt("total", 1_i64)),
1894 )
1895 .unwrap();
1896
1897 assert_eq!(rows.len(), 1);
1898 assert_eq!(
1899 rows[0].get("name"),
1900 Some(&Value::Text(String::from("alpha")))
1901 );
1902 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1903 assert_eq!(
1904 rows[0].get("stddevVersion").map(Value::to_json_value),
1905 Some(serde_json::Value::String(
1906 "1.4142135623730951454746218583".to_owned()
1907 ))
1908 );
1909 assert_eq!(
1910 rows[0].get("varPopVersion"),
1911 Some(&Value::Decimal(Decimal::ONE))
1912 );
1913 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
1914 }
1915
1916 #[test]
1917 fn memory_repository_runs_sound_like_filter() {
1918 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1919 let repository = MemoryRepository::new(metadata).with_rows(
1920 "Order",
1921 vec![
1922 Record::from([
1923 (String::from("id"), Value::U64(1)),
1924 (String::from("version"), Value::I64(1)),
1925 (String::from("name"), Value::Text(String::from("Robert"))),
1926 ]),
1927 Record::from([
1928 (String::from("id"), Value::U64(2)),
1929 (String::from("version"), Value::I64(1)),
1930 (String::from("name"), Value::Text(String::from("Rupert"))),
1931 ]),
1932 Record::from([
1933 (String::from("id"), Value::U64(3)),
1934 (String::from("version"), Value::I64(1)),
1935 (String::from("name"), Value::Text(String::from("Ashcraft"))),
1936 ]),
1937 ],
1938 );
1939
1940 let rows = repository
1941 .fetch_all(
1942 &teaql_core::SelectQuery::new("Order")
1943 .filter(Expr::sound_like("name", "Robert"))
1944 .order_asc("id"),
1945 )
1946 .unwrap();
1947
1948 assert_eq!(rows.len(), 2);
1949 assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
1950 assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
1951 }
1952
1953 #[test]
1954 fn memory_repository_runs_java_style_string_match_filters() {
1955 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1956 let repository = MemoryRepository::new(metadata).with_rows(
1957 "Order",
1958 vec![
1959 Record::from([
1960 (String::from("id"), Value::U64(1)),
1961 (String::from("version"), Value::I64(1)),
1962 (String::from("name"), Value::Text(String::from("tea-order"))),
1963 ]),
1964 Record::from([
1965 (String::from("id"), Value::U64(2)),
1966 (String::from("version"), Value::I64(1)),
1967 (
1968 String::from("name"),
1969 Value::Text(String::from("coffee-order")),
1970 ),
1971 ]),
1972 Record::from([
1973 (String::from("id"), Value::U64(3)),
1974 (String::from("version"), Value::I64(1)),
1975 (
1976 String::from("name"),
1977 Value::Text(String::from("tea-archived")),
1978 ),
1979 ]),
1980 ],
1981 );
1982
1983 let rows = repository
1984 .fetch_all(
1985 &teaql_core::SelectQuery::new("Order")
1986 .filter(
1987 Expr::contain("name", "tea")
1988 .and_expr(Expr::begin_with("name", "tea"))
1989 .and_expr(Expr::end_with("name", "order"))
1990 .and_expr(Expr::not_contain("name", "coffee"))
1991 .and_expr(Expr::not_begin_with("name", "archived"))
1992 .and_expr(Expr::not_end_with("name", "draft")),
1993 )
1994 .order_asc("id"),
1995 )
1996 .unwrap();
1997
1998 assert_eq!(rows.len(), 1);
1999 assert_eq!(
2000 rows[0].get("name"),
2001 Some(&Value::Text("tea-order".to_owned()))
2002 );
2003 }
2004
2005 #[test]
2006 fn memory_repository_runs_property_to_property_filters() {
2007 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2008 let repository = MemoryRepository::new(metadata).with_rows(
2009 "Order",
2010 vec![
2011 Record::from([
2012 (String::from("id"), Value::U64(1)),
2013 (String::from("version"), Value::I64(2)),
2014 (String::from("name"), Value::Text(String::from("keep"))),
2015 ]),
2016 Record::from([
2017 (String::from("id"), Value::U64(2)),
2018 (String::from("version"), Value::I64(1)),
2019 (String::from("name"), Value::Text(String::from("skip"))),
2020 ]),
2021 ],
2022 );
2023
2024 let rows = repository
2025 .fetch_all(
2026 &teaql_core::SelectQuery::new("Order")
2027 .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2028 .order_asc("id"),
2029 )
2030 .unwrap();
2031
2032 assert_eq!(rows.len(), 1);
2033 assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2034 }
2035
2036 #[test]
2037 fn memory_repository_supports_mutations_and_optimistic_locking() {
2038 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2039 let repository = MemoryRepository::new(metadata);
2040
2041 repository
2042 .insert(
2043 &InsertCommand::new("Order")
2044 .value("id", 10_u64)
2045 .value("version", 1_i64)
2046 .value("name", "draft"),
2047 )
2048 .unwrap();
2049 repository
2050 .update(
2051 &UpdateCommand::new("Order", 10_u64)
2052 .expected_version(1)
2053 .value("name", "submitted"),
2054 )
2055 .unwrap();
2056
2057 let row = repository
2058 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2059 .unwrap()
2060 .pop()
2061 .unwrap();
2062 assert_eq!(
2063 row.get("name"),
2064 Some(&Value::Text(String::from("submitted")))
2065 );
2066 assert_eq!(row.get("version"), Some(&Value::I64(2)));
2067
2068 let conflict = repository
2069 .update(
2070 &UpdateCommand::new("Order", 10_u64)
2071 .expected_version(1)
2072 .value("name", "stale"),
2073 )
2074 .unwrap_err();
2075 assert!(matches!(
2076 conflict,
2077 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2078 ));
2079
2080 repository
2081 .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2082 .unwrap();
2083 let row = repository
2084 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2085 .unwrap()
2086 .pop()
2087 .unwrap();
2088 assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2089
2090 repository
2091 .recover(&RecoverCommand::new("Order", 10_u64, -3))
2092 .unwrap();
2093 let row = repository
2094 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2095 .unwrap()
2096 .pop()
2097 .unwrap();
2098 assert_eq!(row.get("version"), Some(&Value::I64(4)));
2099 }
2100}
2101
2102#[cfg(all(test, feature = "sqlx"))]
2103mod sqlx_integration_tests {
2104 use super::sqlx_support::{
2105 MutationExecutorError, PgIdSpaceGenerator, PgMutationExecutor, PgTransactionExecutor,
2106 SqliteIdSpaceGenerator, SqliteMutationExecutor,
2107 };
2108 use super::{
2109 GraphMutationKind, GraphNode, GraphTransactionBoundary, InMemoryMetadataStore,
2110 InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry, QueryExecutor,
2111 RepositoryBehavior, UserContext,
2112 };
2113 use chrono::{NaiveDate, TimeZone, Utc};
2114 use teaql_core::{
2115 DataType, Decimal, DeleteCommand, EntityDescriptor, Expr, InsertCommand,
2116 PropertyDescriptor, Record, RecoverCommand, SelectQuery, UpdateCommand, Value,
2117 };
2118 use teaql_dialect_pg::PostgresDialect;
2119 use teaql_dialect_sqlite::SqliteDialect;
2120 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
2121 use teaql_sql::SqlDialect;
2122 use tokio::runtime::Handle;
2123 use tokio::task::block_in_place;
2124
2125 const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
2126
2127 fn entity() -> EntityDescriptor {
2128 EntityDescriptor::new("Order")
2129 .table_name("orders")
2130 .property(
2131 PropertyDescriptor::new("id", DataType::U64)
2132 .column_name("id")
2133 .id()
2134 .not_null(),
2135 )
2136 .property(
2137 PropertyDescriptor::new("version", DataType::I64)
2138 .column_name("version")
2139 .version()
2140 .not_null(),
2141 )
2142 .property(
2143 PropertyDescriptor::new("name", DataType::Text)
2144 .column_name("name")
2145 .not_null(),
2146 )
2147 .relation(
2148 teaql_core::RelationDescriptor::new("lines", "OrderLine")
2149 .local_key("id")
2150 .foreign_key("order_id")
2151 .many(),
2152 )
2153 }
2154
2155 fn sqlite_entity_keep_missing() -> EntityDescriptor {
2156 EntityDescriptor::new("Order")
2157 .table_name("orders")
2158 .property(
2159 PropertyDescriptor::new("id", DataType::U64)
2160 .column_name("id")
2161 .id()
2162 .not_null(),
2163 )
2164 .property(
2165 PropertyDescriptor::new("version", DataType::I64)
2166 .column_name("version")
2167 .version()
2168 .not_null(),
2169 )
2170 .property(
2171 PropertyDescriptor::new("name", DataType::Text)
2172 .column_name("name")
2173 .not_null(),
2174 )
2175 .relation(
2176 teaql_core::RelationDescriptor::new("lines", "OrderLine")
2177 .local_key("id")
2178 .foreign_key("order_id")
2179 .many()
2180 .keep_missing(),
2181 )
2182 }
2183
2184 fn line_entity() -> EntityDescriptor {
2185 EntityDescriptor::new("OrderLine")
2186 .table_name("orderline")
2187 .property(
2188 PropertyDescriptor::new("id", DataType::U64)
2189 .column_name("id")
2190 .id()
2191 .not_null(),
2192 )
2193 .property(
2194 PropertyDescriptor::new("version", DataType::I64)
2195 .column_name("version")
2196 .version(),
2197 )
2198 .property(
2199 PropertyDescriptor::new("order_id", DataType::U64)
2200 .column_name("order_id")
2201 .not_null(),
2202 )
2203 .property(
2204 PropertyDescriptor::new("name", DataType::Text)
2205 .column_name("name")
2206 .not_null(),
2207 )
2208 .property(
2209 PropertyDescriptor::new("product_id", DataType::U64)
2210 .column_name("product_id")
2211 .not_null(),
2212 )
2213 .relation(
2214 teaql_core::RelationDescriptor::new("product", "Product")
2215 .local_key("product_id")
2216 .foreign_key("id"),
2217 )
2218 }
2219
2220 fn product_entity() -> EntityDescriptor {
2221 EntityDescriptor::new("Product")
2222 .table_name("product")
2223 .property(
2224 PropertyDescriptor::new("id", DataType::U64)
2225 .column_name("id")
2226 .id()
2227 .not_null(),
2228 )
2229 .property(
2230 PropertyDescriptor::new("name", DataType::Text)
2231 .column_name("name")
2232 .not_null(),
2233 )
2234 }
2235
2236 fn typed_entity() -> EntityDescriptor {
2237 EntityDescriptor::new("TypedValue")
2238 .table_name("typed_value")
2239 .property(
2240 PropertyDescriptor::new("id", DataType::U64)
2241 .column_name("id")
2242 .id()
2243 .not_null(),
2244 )
2245 .property(
2246 PropertyDescriptor::new("payload", DataType::Json)
2247 .column_name("payload")
2248 .not_null(),
2249 )
2250 .property(
2251 PropertyDescriptor::new("amount", DataType::Decimal)
2252 .column_name("amount")
2253 .not_null(),
2254 )
2255 .property(
2256 PropertyDescriptor::new("birthday", DataType::Date)
2257 .column_name("birthday")
2258 .not_null(),
2259 )
2260 .property(
2261 PropertyDescriptor::new("happened_at", DataType::Timestamp)
2262 .column_name("happened_at")
2263 .not_null(),
2264 )
2265 }
2266
2267 struct OrderBehavior;
2268 struct NestedOrderBehavior;
2269
2270 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2271 #[teaql(entity = "Product", table = "product")]
2272 struct ProductEntityRow {
2273 #[teaql(id)]
2274 id: u64,
2275 name: String,
2276 }
2277
2278 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2279 #[teaql(entity = "OrderLine", table = "orderline")]
2280 struct OrderLineEntityRow {
2281 #[teaql(id)]
2282 id: u64,
2283 #[teaql(column = "order_id")]
2284 order_id: u64,
2285 name: String,
2286 #[teaql(column = "product_id")]
2287 product_id: u64,
2288 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
2289 product: Option<ProductEntityRow>,
2290 }
2291
2292 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2293 #[teaql(entity = "Order", table = "orders")]
2294 struct OrderAggregateRow {
2295 #[teaql(id)]
2296 id: u64,
2297 #[teaql(version)]
2298 version: i64,
2299 name: String,
2300 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
2301 lines: teaql_core::SmartList<OrderLineEntityRow>,
2302 }
2303
2304 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2305 #[teaql(entity = "Order", table = "orders")]
2306 struct Order {
2307 #[teaql(id)]
2308 id: u64,
2309 #[teaql(version)]
2310 version: i64,
2311 name: String,
2312 }
2313
2314 impl RepositoryBehavior for OrderBehavior {
2315 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2316 vec!["lines".to_owned()]
2317 }
2318 }
2319
2320 impl RepositoryBehavior for NestedOrderBehavior {
2321 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2322 vec!["lines.product".to_owned()]
2323 }
2324 }
2325
2326 #[derive(Clone)]
2327 struct SqliteSyncExecutor {
2328 inner: SqliteMutationExecutor,
2329 }
2330
2331 impl SqliteSyncExecutor {
2332 fn new(inner: SqliteMutationExecutor) -> Self {
2333 Self { inner }
2334 }
2335 }
2336
2337 impl QueryExecutor for SqliteSyncExecutor {
2338 type Error = MutationExecutorError;
2339
2340 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2341 let handle = Handle::current();
2342 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2343 }
2344
2345 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2346 let handle = Handle::current();
2347 block_in_place(|| handle.block_on(self.inner.execute(query)))
2348 }
2349
2350 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2351 let handle = Handle::current();
2352 block_in_place(|| handle.block_on(self.inner.begin_transaction()))?;
2353 Ok(GraphTransactionBoundary::Started)
2354 }
2355
2356 fn commit_transaction(&self) -> Result<(), Self::Error> {
2357 let handle = Handle::current();
2358 block_in_place(|| handle.block_on(self.inner.commit_transaction()))
2359 }
2360
2361 fn rollback_transaction(&self) -> Result<(), Self::Error> {
2362 let handle = Handle::current();
2363 block_in_place(|| handle.block_on(self.inner.rollback_transaction()))
2364 }
2365 }
2366
2367 #[derive(Clone)]
2368 struct PgSyncExecutor {
2369 inner: PgMutationExecutor,
2370 }
2371
2372 impl PgSyncExecutor {
2373 fn new(inner: PgMutationExecutor) -> Self {
2374 Self { inner }
2375 }
2376 }
2377
2378 impl QueryExecutor for PgSyncExecutor {
2379 type Error = MutationExecutorError;
2380
2381 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2382 let handle = Handle::current();
2383 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2384 }
2385
2386 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2387 let handle = Handle::current();
2388 block_in_place(|| handle.block_on(self.inner.execute(query)))
2389 }
2390 }
2391
2392 #[derive(Clone)]
2393 struct PgTxSyncExecutor {
2394 inner: PgTransactionExecutor,
2395 }
2396
2397 impl PgTxSyncExecutor {
2398 fn new(inner: PgTransactionExecutor) -> Self {
2399 Self { inner }
2400 }
2401 }
2402
2403 impl QueryExecutor for PgTxSyncExecutor {
2404 type Error = MutationExecutorError;
2405
2406 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2407 let handle = Handle::current();
2408 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2409 }
2410
2411 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2412 let handle = Handle::current();
2413 block_in_place(|| handle.block_on(self.inner.execute(query)))
2414 }
2415
2416 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2417 Ok(GraphTransactionBoundary::AlreadyActive)
2418 }
2419
2420 fn rollback_transaction(&self) -> Result<(), Self::Error> {
2421 let handle = Handle::current();
2422 block_in_place(|| handle.block_on(self.inner.rollback()))
2423 }
2424 }
2425
2426 #[tokio::test]
2427 async fn sqlite_executor_runs_crud_flow() {
2428 use sqlx::sqlite::SqlitePoolOptions;
2429
2430 let pool = SqlitePoolOptions::new()
2431 .max_connections(1)
2432 .connect("sqlite::memory:")
2433 .await
2434 .unwrap();
2435
2436 let executor = SqliteMutationExecutor::new(pool.clone());
2437 let dialect = SqliteDialect;
2438 let entity = entity();
2439 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2440
2441 let insert = dialect
2442 .compile_insert(
2443 &entity,
2444 &InsertCommand::new("Order")
2445 .value("id", 1_u64)
2446 .value("version", 1_i64)
2447 .value("name", "first"),
2448 )
2449 .unwrap();
2450 assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2451
2452 let select = dialect
2453 .compile_select(
2454 &entity,
2455 &SelectQuery::new("Order")
2456 .project("id")
2457 .project("version")
2458 .project("name")
2459 .filter(Expr::eq("id", 1_u64)),
2460 )
2461 .unwrap();
2462 let rows = executor.fetch_all(&select).await.unwrap();
2463 assert_eq!(rows.len(), 1);
2464 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
2465 assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
2466 assert_eq!(rows[0].get("name"), Some(&Value::Text("first".to_owned())));
2467
2468 let update = dialect
2469 .compile_update(
2470 &entity,
2471 &UpdateCommand::new("Order", 1_u64)
2472 .expected_version(1)
2473 .value("name", "second"),
2474 )
2475 .unwrap();
2476 assert_eq!(executor.execute(&update).await.unwrap(), 1);
2477
2478 let after_update = executor.fetch_all(&select).await.unwrap();
2479 assert_eq!(after_update[0].get("version"), Some(&Value::I64(2)));
2480 assert_eq!(
2481 after_update[0].get("name"),
2482 Some(&Value::Text("second".to_owned()))
2483 );
2484
2485 let delete = dialect
2486 .compile_delete(
2487 &entity,
2488 &DeleteCommand::new("Order", 1_u64).expected_version(2),
2489 )
2490 .unwrap();
2491 assert_eq!(executor.execute(&delete).await.unwrap(), 1);
2492
2493 let after_delete = executor.fetch_all(&select).await.unwrap();
2494 assert_eq!(after_delete[0].get("version"), Some(&Value::I64(-3)));
2495
2496 let recover = dialect
2497 .compile_recover(&entity, &RecoverCommand::new("Order", 1_u64, -3))
2498 .unwrap();
2499 assert_eq!(executor.execute(&recover).await.unwrap(), 1);
2500
2501 let after_recover = executor.fetch_all(&select).await.unwrap();
2502 assert_eq!(after_recover[0].get("version"), Some(&Value::I64(4)));
2503 }
2504
2505 #[tokio::test(flavor = "multi_thread")]
2506 async fn sqlite_executor_enhances_relations() {
2507 use sqlx::sqlite::SqlitePoolOptions;
2508
2509 let pool = SqlitePoolOptions::new()
2510 .max_connections(1)
2511 .connect("sqlite::memory:")
2512 .await
2513 .unwrap();
2514
2515 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2516 let order = entity();
2517 let line = line_entity();
2518 mutation_executor
2519 .ensure_schema(&SqliteDialect, &[&order, &line])
2520 .await
2521 .unwrap();
2522
2523 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1'), (2, 1, 'o2')")
2524 .execute(&pool)
2525 .await
2526 .unwrap();
2527 sqlx::query(
2528 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2529 (101, 1, 1001, 'l1'),
2530 (102, 1, 1002, 'l2'),
2531 (201, 2, 1003, 'l3')",
2532 )
2533 .execute(&pool)
2534 .await
2535 .unwrap();
2536
2537 let executor = SqliteSyncExecutor::new(mutation_executor);
2538 let mut ctx = UserContext::new()
2539 .with_metadata(
2540 InMemoryMetadataStore::new()
2541 .with_entity(order)
2542 .with_entity(line)
2543 .with_entity(product_entity()),
2544 )
2545 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2546 .with_repository_behavior_registry(
2547 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
2548 );
2549 ctx.insert_resource(SqliteDialect);
2550 ctx.insert_resource(executor);
2551
2552 let repo = ctx
2553 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2554 .unwrap();
2555 let mut parents = repo
2556 .fetch_all(
2557 &repo
2558 .select()
2559 .project("id")
2560 .project("version")
2561 .project("name")
2562 .order_by(teaql_core::OrderBy::asc("id")),
2563 )
2564 .unwrap();
2565
2566 repo.enhance_relations(&mut parents).unwrap();
2567
2568 assert_eq!(parents.len(), 2);
2569 match parents[0].get("lines") {
2570 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
2571 other => panic!("unexpected first lines payload: {other:?}"),
2572 }
2573 match parents[1].get("lines") {
2574 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
2575 other => panic!("unexpected second lines payload: {other:?}"),
2576 }
2577 }
2578
2579 #[tokio::test(flavor = "multi_thread")]
2580 async fn sqlite_executor_enhances_nested_relations() {
2581 use sqlx::sqlite::SqlitePoolOptions;
2582
2583 let pool = SqlitePoolOptions::new()
2584 .max_connections(1)
2585 .connect("sqlite::memory:")
2586 .await
2587 .unwrap();
2588
2589 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2590 let order = entity();
2591 let line = line_entity();
2592 let product = product_entity();
2593 mutation_executor
2594 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2595 .await
2596 .unwrap();
2597
2598 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2599 .execute(&pool)
2600 .await
2601 .unwrap();
2602 sqlx::query(
2603 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2604 (101, 1, 1001, 'l1'),
2605 (102, 1, 1002, 'l2')",
2606 )
2607 .execute(&pool)
2608 .await
2609 .unwrap();
2610 sqlx::query(
2611 "INSERT INTO product (id, name) VALUES
2612 (1001, 'p1'),
2613 (1002, 'p2')",
2614 )
2615 .execute(&pool)
2616 .await
2617 .unwrap();
2618
2619 let executor = SqliteSyncExecutor::new(mutation_executor);
2620 let mut ctx = UserContext::new()
2621 .with_metadata(
2622 InMemoryMetadataStore::new()
2623 .with_entity(order)
2624 .with_entity(line)
2625 .with_entity(product),
2626 )
2627 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2628 .with_repository_behavior_registry(
2629 InMemoryRepositoryBehaviorRegistry::new()
2630 .with_behavior("Order", NestedOrderBehavior),
2631 );
2632 ctx.insert_resource(SqliteDialect);
2633 ctx.insert_resource(executor);
2634
2635 let repo = ctx
2636 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2637 .unwrap();
2638 let mut parents = repo
2639 .fetch_all(
2640 &repo
2641 .select()
2642 .project("id")
2643 .project("version")
2644 .project("name"),
2645 )
2646 .unwrap();
2647
2648 repo.enhance_relations(&mut parents).unwrap();
2649
2650 match parents[0].get("lines") {
2651 Some(Value::List(lines)) => {
2652 assert_eq!(lines.len(), 2);
2653 for line in lines {
2654 match line {
2655 Value::Object(line_record) => match line_record.get("product") {
2656 Some(Value::Object(product)) => {
2657 assert!(product.get("name").is_some());
2658 }
2659 other => panic!("unexpected product payload: {other:?}"),
2660 },
2661 other => panic!("unexpected line payload: {other:?}"),
2662 }
2663 }
2664 }
2665 other => panic!("unexpected nested lines payload: {other:?}"),
2666 }
2667 }
2668
2669 #[tokio::test(flavor = "multi_thread")]
2670 async fn sqlite_executor_enhances_query_relation_with_child_query() {
2671 use sqlx::sqlite::SqlitePoolOptions;
2672
2673 let pool = SqlitePoolOptions::new()
2674 .max_connections(1)
2675 .connect("sqlite::memory:")
2676 .await
2677 .unwrap();
2678
2679 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2680 let order = entity();
2681 let line = line_entity();
2682 let product = product_entity();
2683 mutation_executor
2684 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2685 .await
2686 .unwrap();
2687
2688 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2689 .execute(&pool)
2690 .await
2691 .unwrap();
2692 sqlx::query(
2693 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2694 (101, 1, 1001, 'keep'),
2695 (102, 1, 1002, 'drop')",
2696 )
2697 .execute(&pool)
2698 .await
2699 .unwrap();
2700 sqlx::query(
2701 "INSERT INTO product (id, name) VALUES
2702 (1001, 'p1'),
2703 (1002, 'p2')",
2704 )
2705 .execute(&pool)
2706 .await
2707 .unwrap();
2708
2709 let executor = SqliteSyncExecutor::new(mutation_executor);
2710 let mut ctx = UserContext::new()
2711 .with_metadata(
2712 InMemoryMetadataStore::new()
2713 .with_entity(order)
2714 .with_entity(line)
2715 .with_entity(product),
2716 )
2717 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2718 ctx.insert_resource(SqliteDialect);
2719 ctx.insert_resource(executor);
2720
2721 let repo = ctx
2722 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2723 .unwrap();
2724 let query = repo.select().relation_query(
2725 "lines",
2726 SelectQuery::new("OrderLine")
2727 .project("name")
2728 .filter(Expr::eq("name", "keep"))
2729 .order_desc("id")
2730 .page(0, 10)
2731 .relation_query("product", SelectQuery::new("Product").project("name")),
2732 );
2733 let mut parents = repo
2734 .fetch_all(
2735 &repo
2736 .select()
2737 .project("id")
2738 .project("version")
2739 .project("name"),
2740 )
2741 .unwrap();
2742
2743 repo.enhance_query_relations(&mut parents, &query).unwrap();
2744
2745 match parents[0].get("lines") {
2746 Some(Value::List(lines)) => {
2747 assert_eq!(lines.len(), 1);
2748 let Value::Object(line) = &lines[0] else {
2749 panic!("unexpected line payload: {:?}", lines[0]);
2750 };
2751 assert_eq!(line.get("name"), Some(&Value::Text("keep".to_owned())));
2752 assert_eq!(line.get("order_id"), Some(&Value::I64(1)));
2753 assert_eq!(line.get("product_id"), Some(&Value::I64(1001)));
2754 match line.get("product") {
2755 Some(Value::Object(product)) => {
2756 assert_eq!(product.get("name"), Some(&Value::Text("p1".to_owned())));
2757 assert_eq!(product.get("id"), Some(&Value::I64(1001)));
2758 }
2759 other => panic!("unexpected product payload: {other:?}"),
2760 }
2761 }
2762 other => panic!("unexpected query relation payload: {other:?}"),
2763 }
2764 }
2765
2766 #[tokio::test]
2767 async fn sqlite_executor_ensure_schema_adds_missing_columns() {
2768 use sqlx::Row;
2769 use sqlx::sqlite::SqlitePoolOptions;
2770
2771 let pool = SqlitePoolOptions::new()
2772 .max_connections(1)
2773 .connect("sqlite::memory:")
2774 .await
2775 .unwrap();
2776
2777 sqlx::query(
2778 "CREATE TABLE orders (
2779 id INTEGER PRIMARY KEY
2780 )",
2781 )
2782 .execute(&pool)
2783 .await
2784 .unwrap();
2785
2786 let executor = SqliteMutationExecutor::new(pool.clone());
2787 let dialect = SqliteDialect;
2788 let entity = entity();
2789
2790 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2791
2792 let columns = sqlx::query("PRAGMA table_info(\"orders\")")
2793 .fetch_all(&pool)
2794 .await
2795 .unwrap()
2796 .into_iter()
2797 .map(|row| row.try_get::<String, _>("name").unwrap())
2798 .collect::<Vec<_>>();
2799
2800 assert!(columns.contains(&"id".to_owned()));
2801 assert!(columns.contains(&"version".to_owned()));
2802 assert!(columns.contains(&"name".to_owned()));
2803 }
2804
2805 #[tokio::test]
2806 async fn user_context_can_ensure_sqlite_schema_from_runtime_module() {
2807 use sqlx::Row;
2808 use sqlx::sqlite::SqlitePoolOptions;
2809
2810 let pool = SqlitePoolOptions::new()
2811 .max_connections(1)
2812 .connect("sqlite::memory:")
2813 .await
2814 .unwrap();
2815
2816 let module = super::RuntimeModule::new()
2817 .descriptor(entity())
2818 .descriptor(line_entity())
2819 .descriptor(product_entity())
2820 .initial_graph(
2821 GraphNode::new("Order")
2822 .value("id", 1_u64)
2823 .value("version", 1_i64)
2824 .value("name", "seed-order"),
2825 );
2826 let mut ctx = super::UserContext::new().with_module(module);
2827 ctx.insert_resource(SqliteDialect);
2828 ctx.insert_resource(SqliteMutationExecutor::new(pool.clone()));
2829
2830 ctx.ensure_sqlite_schema().await.unwrap();
2831 sqlx::query("UPDATE orders SET name = 'stale-seed-order' WHERE id = 1")
2832 .execute(&pool)
2833 .await
2834 .unwrap();
2835 ctx.ensure_sqlite_schema().await.unwrap();
2836
2837 let tables = sqlx::query(
2838 "SELECT name FROM sqlite_master WHERE type = 'table' AND name IN ('orders', 'orderline', 'product') ORDER BY name",
2839 )
2840 .fetch_all(&pool)
2841 .await
2842 .unwrap()
2843 .into_iter()
2844 .map(|row| row.try_get::<String, _>("name").unwrap())
2845 .collect::<Vec<_>>();
2846
2847 assert_eq!(
2848 tables,
2849 vec![
2850 "orderline".to_owned(),
2851 "orders".to_owned(),
2852 "product".to_owned()
2853 ]
2854 );
2855
2856 let seed_count: i64 =
2857 sqlx::query_scalar("SELECT COUNT(1) FROM orders WHERE id = 1 AND name = 'seed-order'")
2858 .fetch_one(&pool)
2859 .await
2860 .unwrap();
2861 assert_eq!(seed_count, 1);
2862 }
2863
2864 #[tokio::test]
2865 async fn sqlite_executor_roundtrips_json_decimal_date_and_timestamp() {
2866 use sqlx::sqlite::SqlitePoolOptions;
2867
2868 let pool = SqlitePoolOptions::new()
2869 .max_connections(1)
2870 .connect("sqlite::memory:")
2871 .await
2872 .unwrap();
2873
2874 let executor = SqliteMutationExecutor::new(pool.clone());
2875 let dialect = SqliteDialect;
2876 let entity = typed_entity();
2877 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2878
2879 let birthday = NaiveDate::from_ymd_opt(2024, 2, 3).unwrap();
2880 let happened_at = Utc.with_ymd_and_hms(2024, 2, 3, 4, 5, 6).unwrap();
2881 let payload = serde_json::json!({"name": "teaql", "count": 2});
2882 let amount = Decimal::new(12345, 2);
2883
2884 let insert = dialect
2885 .compile_insert(
2886 &entity,
2887 &InsertCommand::new("TypedValue")
2888 .value("id", 1_u64)
2889 .value("payload", payload.clone())
2890 .value("amount", amount)
2891 .value("birthday", birthday)
2892 .value("happened_at", happened_at),
2893 )
2894 .unwrap();
2895 assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2896
2897 let select = dialect
2898 .compile_select(
2899 &entity,
2900 &SelectQuery::new("TypedValue")
2901 .project("id")
2902 .project("payload")
2903 .project("amount")
2904 .project("birthday")
2905 .project("happened_at")
2906 .filter(Expr::eq("id", 1_u64)),
2907 )
2908 .unwrap();
2909 let rows = executor.fetch_all(&select).await.unwrap();
2910 assert_eq!(rows.len(), 1);
2911 assert_eq!(rows[0].get("payload"), Some(&Value::Json(payload)));
2912 assert_eq!(rows[0].get("amount"), Some(&Value::Decimal(amount)));
2913 assert_eq!(rows[0].get("birthday"), Some(&Value::Date(birthday)));
2914 assert_eq!(
2915 rows[0].get("happened_at"),
2916 Some(&Value::Timestamp(happened_at))
2917 );
2918 }
2919
2920 #[tokio::test(flavor = "multi_thread")]
2921 async fn sqlite_fetches_enhanced_typed_entities() {
2922 use sqlx::sqlite::SqlitePoolOptions;
2923
2924 let pool = SqlitePoolOptions::new()
2925 .max_connections(1)
2926 .connect("sqlite::memory:")
2927 .await
2928 .unwrap();
2929
2930 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2931 let order = entity();
2932 let line = line_entity();
2933 let product = product_entity();
2934 mutation_executor
2935 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2936 .await
2937 .unwrap();
2938
2939 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2940 .execute(&pool)
2941 .await
2942 .unwrap();
2943 sqlx::query(
2944 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2945 (101, 1, 1001, 'l1'),
2946 (102, 1, 1002, 'l2')",
2947 )
2948 .execute(&pool)
2949 .await
2950 .unwrap();
2951 sqlx::query(
2952 "INSERT INTO product (id, name) VALUES
2953 (1001, 'p1'),
2954 (1002, 'p2')",
2955 )
2956 .execute(&pool)
2957 .await
2958 .unwrap();
2959
2960 let executor = SqliteSyncExecutor::new(mutation_executor);
2961 let mut ctx = UserContext::new()
2962 .with_metadata(
2963 InMemoryMetadataStore::new()
2964 .with_entity(order)
2965 .with_entity(line)
2966 .with_entity(product),
2967 )
2968 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2969 .with_repository_behavior_registry(
2970 InMemoryRepositoryBehaviorRegistry::new()
2971 .with_behavior("Order", NestedOrderBehavior),
2972 );
2973 ctx.insert_resource(SqliteDialect);
2974 ctx.insert_resource(executor);
2975
2976 let repo = ctx
2977 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2978 .unwrap();
2979 let rows = repo
2980 .fetch_enhanced_entities::<OrderAggregateRow>(
2981 &repo
2982 .select()
2983 .project("id")
2984 .project("version")
2985 .project("name"),
2986 )
2987 .unwrap();
2988
2989 assert_eq!(rows.len(), 1);
2990 let order = rows.first().unwrap();
2991 assert_eq!(order.id, 1);
2992 assert_eq!(order.lines.len(), 2);
2993 assert_eq!(order.lines.data[0].product.as_ref().unwrap().name, "p1");
2994 assert_eq!(order.lines.data[1].product.as_ref().unwrap().name, "p2");
2995 }
2996
2997 #[tokio::test(flavor = "multi_thread")]
2998 async fn sqlite_fetches_typed_smart_list_entities() {
2999 use sqlx::sqlite::SqlitePoolOptions;
3000
3001 let pool = SqlitePoolOptions::new()
3002 .max_connections(1)
3003 .connect("sqlite::memory:")
3004 .await
3005 .unwrap();
3006
3007 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3008 let order = entity();
3009 mutation_executor
3010 .ensure_schema(&SqliteDialect, &[&order])
3011 .await
3012 .unwrap();
3013
3014 sqlx::query(
3015 "INSERT INTO orders (id, version, name) VALUES
3016 (1, 1, 'o1'),
3017 (2, 3, 'o2')",
3018 )
3019 .execute(&pool)
3020 .await
3021 .unwrap();
3022
3023 let executor = SqliteSyncExecutor::new(mutation_executor);
3024 let mut ctx = UserContext::new()
3025 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3026 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3027 ctx.insert_resource(SqliteDialect);
3028 ctx.insert_resource(executor);
3029
3030 let repo = ctx
3031 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3032 .unwrap();
3033 let rows = repo
3034 .fetch_entities::<OrderAggregateRow>(
3035 &repo
3036 .select()
3037 .project("id")
3038 .project("version")
3039 .project("name"),
3040 )
3041 .unwrap();
3042
3043 assert_eq!(rows.len(), 2);
3044 assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3045 assert_eq!(rows.versions(), vec![1, 3]);
3046 assert!(rows.data[0].lines.is_empty());
3047 assert!(rows.data[1].lines.is_empty());
3048 }
3049
3050 #[tokio::test(flavor = "multi_thread")]
3051 async fn sqlite_fetches_smart_list_of_order_entities() {
3052 use sqlx::sqlite::SqlitePoolOptions;
3053
3054 let pool = SqlitePoolOptions::new()
3055 .max_connections(1)
3056 .connect("sqlite::memory:")
3057 .await
3058 .unwrap();
3059
3060 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3061 let order = entity();
3062 mutation_executor
3063 .ensure_schema(&SqliteDialect, &[&order])
3064 .await
3065 .unwrap();
3066
3067 sqlx::query(
3068 "INSERT INTO orders (id, version, name) VALUES
3069 (1, 1, 'o1'),
3070 (2, 3, 'o2')",
3071 )
3072 .execute(&pool)
3073 .await
3074 .unwrap();
3075
3076 let executor = SqliteSyncExecutor::new(mutation_executor);
3077 let mut ctx = UserContext::new()
3078 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3079 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3080 ctx.insert_resource(SqliteDialect);
3081 ctx.insert_resource(executor);
3082
3083 let repo = ctx
3084 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3085 .unwrap();
3086 let rows = repo
3087 .fetch_entities::<Order>(
3088 &repo
3089 .select()
3090 .project("id")
3091 .project("version")
3092 .project("name"),
3093 )
3094 .unwrap();
3095
3096 assert_eq!(
3097 rows.data,
3098 vec![
3099 Order {
3100 id: 1,
3101 version: 1,
3102 name: "o1".to_owned(),
3103 },
3104 Order {
3105 id: 2,
3106 version: 3,
3107 name: "o2".to_owned(),
3108 }
3109 ]
3110 );
3111 assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3112 assert_eq!(rows.versions(), vec![1, 3]);
3113 }
3114
3115 #[tokio::test(flavor = "multi_thread")]
3116 async fn sqlite_insert_generates_missing_id() {
3117 use sqlx::sqlite::SqlitePoolOptions;
3118
3119 let pool = SqlitePoolOptions::new()
3120 .max_connections(1)
3121 .connect("sqlite::memory:")
3122 .await
3123 .unwrap();
3124
3125 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3126 let order = entity();
3127 mutation_executor
3128 .ensure_schema(&SqliteDialect, &[&order])
3129 .await
3130 .unwrap();
3131
3132 let executor = SqliteSyncExecutor::new(mutation_executor);
3133 let mut ctx = UserContext::new()
3134 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3135 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3136 ctx.insert_resource(SqliteDialect);
3137 ctx.insert_resource(executor);
3138
3139 let repo = ctx
3140 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3141 .unwrap();
3142 let affected = repo
3143 .insert(
3144 &repo
3145 .insert_command()
3146 .value("version", 1_i64)
3147 .value("name", "generated"),
3148 )
3149 .unwrap();
3150 assert_eq!(affected, 1);
3151
3152 let rows = repo
3153 .fetch_entities::<Order>(
3154 &repo
3155 .select()
3156 .project("id")
3157 .project("version")
3158 .project("name")
3159 .filter(Expr::eq("name", "generated")),
3160 )
3161 .unwrap();
3162 assert_eq!(rows.len(), 1);
3163 let order = rows.first().unwrap();
3164 assert!(order.id > 0);
3165 assert_eq!(order.version, 1);
3166 assert_eq!(order.name, "generated");
3167 }
3168
3169 #[tokio::test(flavor = "multi_thread")]
3170 async fn sqlite_save_graph_inserts_nested_rows() {
3171 use sqlx::{Row, sqlite::SqlitePoolOptions};
3172
3173 let pool = SqlitePoolOptions::new()
3174 .max_connections(1)
3175 .connect("sqlite::memory:?cache=shared")
3176 .await
3177 .unwrap();
3178
3179 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3180 let order = entity();
3181 let line = line_entity();
3182 let product = product_entity();
3183 mutation_executor
3184 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3185 .await
3186 .unwrap();
3187
3188 let executor = SqliteSyncExecutor::new(mutation_executor);
3189 let mut ctx = UserContext::new()
3190 .with_metadata(
3191 InMemoryMetadataStore::new()
3192 .with_entity(order)
3193 .with_entity(line)
3194 .with_entity(product),
3195 )
3196 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3197 ctx.insert_resource(SqliteDialect);
3198 ctx.insert_resource(executor);
3199
3200 let repo = ctx
3201 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3202 .unwrap();
3203 let graph = GraphNode::new("Order")
3204 .value("id", 1_u64)
3205 .value("version", 1_i64)
3206 .value("name", "root")
3207 .relation(
3208 "lines",
3209 GraphNode::new("OrderLine")
3210 .value("id", 10_u64)
3211 .value("name", "line-1")
3212 .relation(
3213 "product",
3214 GraphNode::new("Product")
3215 .value("id", 100_u64)
3216 .value("name", "sku-1"),
3217 ),
3218 );
3219
3220 let saved = repo.save_graph(graph).unwrap();
3221 assert_eq!(
3222 saved.relations["lines"][0].values.get("order_id"),
3223 Some(&Value::U64(1))
3224 );
3225 assert_eq!(
3226 saved.relations["lines"][0].values.get("product_id"),
3227 Some(&Value::U64(100))
3228 );
3229
3230 let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3231 .fetch_one(&pool)
3232 .await
3233 .unwrap()
3234 .try_get("count")
3235 .unwrap();
3236 let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 10")
3237 .fetch_one(&pool)
3238 .await
3239 .unwrap();
3240 let product_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM product")
3241 .fetch_one(&pool)
3242 .await
3243 .unwrap()
3244 .try_get("count")
3245 .unwrap();
3246
3247 assert_eq!(order_count, 1);
3248 assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 1);
3249 assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 100);
3250 assert_eq!(product_count, 1);
3251 }
3252
3253 #[tokio::test(flavor = "multi_thread")]
3254 async fn sqlite_save_typed_entity_graph_create_inserts_nested_rows() {
3255 use sqlx::{Row, sqlite::SqlitePoolOptions};
3256
3257 let pool = SqlitePoolOptions::new()
3258 .max_connections(1)
3259 .connect("sqlite::memory:")
3260 .await
3261 .unwrap();
3262
3263 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3264 let order = entity();
3265 let line = line_entity();
3266 let product = product_entity();
3267 mutation_executor
3268 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3269 .await
3270 .unwrap();
3271
3272 let executor = SqliteSyncExecutor::new(mutation_executor);
3273 let mut ctx = UserContext::new()
3274 .with_metadata(
3275 InMemoryMetadataStore::new()
3276 .with_entity(order)
3277 .with_entity(line)
3278 .with_entity(product),
3279 )
3280 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3281 ctx.insert_resource(SqliteDialect);
3282 ctx.insert_resource(executor);
3283
3284 let repo = ctx
3285 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3286 .unwrap();
3287 let saved = repo
3288 .save_entity_graph(OrderAggregateRow {
3289 id: 2,
3290 version: 1,
3291 name: "typed-root".to_owned(),
3292 lines: teaql_core::SmartList::from(vec![OrderLineEntityRow {
3293 id: 20,
3294 order_id: 0,
3295 name: "typed-line".to_owned(),
3296 product_id: 200,
3297 product: Some(ProductEntityRow {
3298 id: 200,
3299 name: "typed-sku".to_owned(),
3300 }),
3301 }]),
3302 })
3303 .unwrap();
3304
3305 assert_eq!(saved.values.get("id"), Some(&Value::U64(2)));
3306 assert_eq!(
3307 saved.relations["lines"][0].values.get("order_id"),
3308 Some(&Value::U64(2))
3309 );
3310 assert_eq!(
3311 saved.relations["lines"][0].values.get("product_id"),
3312 Some(&Value::U64(200))
3313 );
3314
3315 let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3316 .fetch_one(&pool)
3317 .await
3318 .unwrap()
3319 .try_get("count")
3320 .unwrap();
3321 let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 20")
3322 .fetch_one(&pool)
3323 .await
3324 .unwrap();
3325 let product_name: String = sqlx::query_scalar("SELECT name FROM product WHERE id = 200")
3326 .fetch_one(&pool)
3327 .await
3328 .unwrap();
3329
3330 assert_eq!(order_count, 1);
3331 assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 2);
3332 assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 200);
3333 assert_eq!(product_name, "typed-sku");
3334 }
3335
3336 #[tokio::test(flavor = "multi_thread")]
3337 async fn sqlite_plan_for_save_graph_assigns_ids_and_batches_before_execution() {
3338 use sqlx::sqlite::SqlitePoolOptions;
3339 use std::time::{SystemTime, UNIX_EPOCH};
3340
3341 let db_path = std::env::temp_dir().join(format!(
3342 "teaql-plan-{}-{}.db",
3343 std::process::id(),
3344 SystemTime::now()
3345 .duration_since(UNIX_EPOCH)
3346 .unwrap()
3347 .as_nanos()
3348 ));
3349 let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
3350 let pool = SqlitePoolOptions::new()
3351 .max_connections(1)
3352 .connect(&db_url)
3353 .await
3354 .unwrap();
3355
3356 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3357 let order = entity();
3358 let line = line_entity();
3359 let product = product_entity();
3360 mutation_executor
3361 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3362 .await
3363 .unwrap();
3364
3365 sqlx::query("INSERT INTO orders (id, version, name) VALUES (100, 1, 'existing')")
3366 .execute(&pool)
3367 .await
3368 .unwrap();
3369 sqlx::query(
3370 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3371 (200, 1, 100, 301, 'line-a'),
3372 (201, 1, 100, 302, 'line-b')",
3373 )
3374 .execute(&pool)
3375 .await
3376 .unwrap();
3377 let seeded_lines: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM orderline")
3378 .fetch_one(&pool)
3379 .await
3380 .unwrap();
3381 assert_eq!(seeded_lines, 2);
3382
3383 let executor = SqliteSyncExecutor::new(mutation_executor);
3384 let mut ctx = UserContext::new()
3385 .with_metadata(
3386 InMemoryMetadataStore::new()
3387 .with_entity(order)
3388 .with_entity(line)
3389 .with_entity(product),
3390 )
3391 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
3392 .with_internal_id_generator(SqliteIdSpaceGenerator::new(pool.clone()));
3393 ctx.insert_resource(SqliteDialect);
3394 ctx.insert_resource(executor);
3395
3396 let graph = GraphNode::new("Order")
3397 .value("id", 100_u64)
3398 .value("version", 1_i64)
3399 .value("name", "existing-updated")
3400 .relation(
3401 "lines",
3402 GraphNode::new("OrderLine")
3403 .value("name", "new-line-a")
3404 .value("product_id", 401_u64),
3405 )
3406 .relation(
3407 "lines",
3408 GraphNode::new("OrderLine")
3409 .value("name", "new-line-b")
3410 .value("product_id", 402_u64),
3411 )
3412 .relation(
3413 "lines",
3414 GraphNode::new("OrderLine")
3415 .value("id", 200_u64)
3416 .value("version", 1_i64)
3417 .value("name", "line-a-updated"),
3418 )
3419 .relation(
3420 "lines",
3421 GraphNode::new("OrderLine")
3422 .value("id", 201_u64)
3423 .value("version", 1_i64)
3424 .value("name", "line-b-updated"),
3425 );
3426
3427 let plan = ctx
3428 .plan_for_save_graph::<SqliteDialect, SqliteSyncExecutor>(graph)
3429 .unwrap();
3430 let counts = plan.grouped_counts();
3431 assert_eq!(
3432 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
3433 Some(&1)
3434 );
3435 assert_eq!(
3436 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
3437 Some(&2)
3438 );
3439 assert_eq!(
3440 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
3441 Some(&2)
3442 );
3443
3444 let create_batch = plan
3445 .batches
3446 .iter()
3447 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
3448 .unwrap();
3449 assert_eq!(create_batch.items.len(), 2);
3450 assert_eq!(create_batch.items[0].values.get("id"), Some(&Value::U64(1)));
3451 assert_eq!(create_batch.items[1].values.get("id"), Some(&Value::U64(2)));
3452
3453 let update_batch = plan
3454 .batches
3455 .iter()
3456 .find(|batch| {
3457 batch.entity == "OrderLine"
3458 && batch.kind == GraphMutationKind::Update
3459 && batch.update_fields == vec!["name".to_owned()]
3460 })
3461 .unwrap();
3462 assert_eq!(update_batch.items.len(), 2);
3463
3464 let repo = ctx
3465 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3466 .unwrap();
3467 let saved = repo.execute_graph_plan(plan).unwrap();
3468 let lines = saved.relations.get("lines").unwrap();
3469 assert_eq!(lines.len(), 4);
3470
3471 let generated_count: i64 = sqlx::query_scalar(
3472 "SELECT COUNT(*) FROM orderline WHERE id IN (1, 2) AND order_id = 100",
3473 )
3474 .fetch_one(&pool)
3475 .await
3476 .unwrap();
3477 assert_eq!(generated_count, 2);
3478 let updated_name: String = sqlx::query_scalar("SELECT name FROM orderline WHERE id = 200")
3479 .fetch_one(&pool)
3480 .await
3481 .unwrap();
3482 assert_eq!(updated_name, "line-a-updated");
3483 pool.close().await;
3484 let _ = std::fs::remove_file(db_path);
3485 }
3486
3487 #[tokio::test(flavor = "multi_thread")]
3488 async fn sqlite_save_graph_updates_nested_rows_and_deletes_missing_children() {
3489 use sqlx::{Row, sqlite::SqlitePoolOptions};
3490
3491 let pool = SqlitePoolOptions::new()
3492 .max_connections(1)
3493 .connect("sqlite::memory:")
3494 .await
3495 .unwrap();
3496
3497 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3498 let order = entity();
3499 let line = line_entity();
3500 let product = product_entity();
3501 mutation_executor
3502 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3503 .await
3504 .unwrap();
3505
3506 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'old-root')")
3507 .execute(&pool)
3508 .await
3509 .unwrap();
3510 sqlx::query(
3511 "INSERT INTO product (id, name) VALUES
3512 (100, 'old-sku'),
3513 (101, 'removed-sku')",
3514 )
3515 .execute(&pool)
3516 .await
3517 .unwrap();
3518 sqlx::query(
3519 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
3520 (10, 1, 100, 'old-line'),
3521 (11, 1, 101, 'removed-line')",
3522 )
3523 .execute(&pool)
3524 .await
3525 .unwrap();
3526 sqlx::query("UPDATE orderline SET version = 1")
3527 .execute(&pool)
3528 .await
3529 .unwrap();
3530
3531 let executor = SqliteSyncExecutor::new(mutation_executor);
3532 let mut ctx = UserContext::new()
3533 .with_metadata(
3534 InMemoryMetadataStore::new()
3535 .with_entity(order)
3536 .with_entity(line)
3537 .with_entity(product),
3538 )
3539 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3540 ctx.insert_resource(SqliteDialect);
3541 ctx.insert_resource(executor);
3542
3543 let repo = ctx
3544 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3545 .unwrap();
3546 let graph = GraphNode::new("Order")
3547 .value("id", 1_u64)
3548 .value("version", 1_i64)
3549 .value("name", "new-root")
3550 .relation(
3551 "lines",
3552 GraphNode::new("OrderLine")
3553 .value("id", 10_u64)
3554 .value("version", 1_i64)
3555 .value("name", "new-line")
3556 .relation(
3557 "product",
3558 GraphNode::new("Product")
3559 .value("id", 100_u64)
3560 .value("name", "new-sku"),
3561 ),
3562 )
3563 .relation(
3564 "lines",
3565 GraphNode::new("OrderLine")
3566 .value("id", 12_u64)
3567 .value("version", 1_i64)
3568 .value("name", "added-line")
3569 .relation(
3570 "product",
3571 GraphNode::new("Product")
3572 .value("id", 102_u64)
3573 .value("name", "added-sku"),
3574 ),
3575 );
3576
3577 let saved = repo.save_graph(graph).unwrap();
3578 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
3579
3580 let order_row = sqlx::query("SELECT version, name FROM orders WHERE id = 1")
3581 .fetch_one(&pool)
3582 .await
3583 .unwrap();
3584 assert_eq!(order_row.try_get::<i64, _>("version").unwrap(), 2);
3585 assert_eq!(order_row.try_get::<String, _>("name").unwrap(), "new-root");
3586
3587 let updated_line =
3588 sqlx::query("SELECT version, product_id, name FROM orderline WHERE id = 10")
3589 .fetch_one(&pool)
3590 .await
3591 .unwrap();
3592 assert_eq!(updated_line.try_get::<i64, _>("version").unwrap(), 2);
3593 assert_eq!(updated_line.try_get::<i64, _>("product_id").unwrap(), 100);
3594 assert_eq!(
3595 updated_line.try_get::<String, _>("name").unwrap(),
3596 "new-line"
3597 );
3598
3599 let added_line =
3600 sqlx::query("SELECT order_id, product_id, name FROM orderline WHERE id = 12")
3601 .fetch_one(&pool)
3602 .await
3603 .unwrap();
3604 assert_eq!(added_line.try_get::<i64, _>("order_id").unwrap(), 1);
3605 assert_eq!(added_line.try_get::<i64, _>("product_id").unwrap(), 102);
3606 assert_eq!(
3607 added_line.try_get::<String, _>("name").unwrap(),
3608 "added-line"
3609 );
3610
3611 let deleted_line = sqlx::query("SELECT version FROM orderline WHERE id = 11")
3612 .fetch_one(&pool)
3613 .await
3614 .unwrap();
3615 assert_eq!(deleted_line.try_get::<i64, _>("version").unwrap(), -2);
3616
3617 let updated_product = sqlx::query("SELECT name FROM product WHERE id = 100")
3618 .fetch_one(&pool)
3619 .await
3620 .unwrap();
3621 assert_eq!(
3622 updated_product.try_get::<String, _>("name").unwrap(),
3623 "new-sku"
3624 );
3625 }
3626
3627 #[tokio::test(flavor = "multi_thread")]
3628 async fn sqlite_save_graph_supports_reference_remove_and_keep_missing() {
3629 use sqlx::{Row, sqlite::SqlitePoolOptions};
3630
3631 let pool = SqlitePoolOptions::new()
3632 .max_connections(1)
3633 .connect("sqlite::memory:")
3634 .await
3635 .unwrap();
3636
3637 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3638 let order = sqlite_entity_keep_missing();
3639 let line = line_entity();
3640 let product = product_entity();
3641 mutation_executor
3642 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3643 .await
3644 .unwrap();
3645
3646 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3647 .execute(&pool)
3648 .await
3649 .unwrap();
3650 sqlx::query("INSERT INTO product (id, name) VALUES (100, 'reference-only')")
3651 .execute(&pool)
3652 .await
3653 .unwrap();
3654 sqlx::query(
3655 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3656 (10, 1, 1, 100, 'remove-me'),
3657 (11, 1, 1, 100, 'keep-me')",
3658 )
3659 .execute(&pool)
3660 .await
3661 .unwrap();
3662
3663 let executor = SqliteSyncExecutor::new(mutation_executor);
3664 let mut ctx = UserContext::new()
3665 .with_metadata(
3666 InMemoryMetadataStore::new()
3667 .with_entity(order)
3668 .with_entity(line)
3669 .with_entity(product),
3670 )
3671 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3672 ctx.insert_resource(SqliteDialect);
3673 ctx.insert_resource(executor);
3674
3675 let repo = ctx
3676 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3677 .unwrap();
3678 let graph = GraphNode::new("Order")
3679 .value("id", 1_u64)
3680 .value("version", 1_i64)
3681 .value("name", "root-updated")
3682 .relation(
3683 "lines",
3684 GraphNode::new("OrderLine")
3685 .value("id", 10_u64)
3686 .value("version", 1_i64)
3687 .remove(),
3688 )
3689 .relation(
3690 "lines",
3691 GraphNode::new("OrderLine")
3692 .value("id", 12_u64)
3693 .value("version", 1_i64)
3694 .value("name", "new-reference-line")
3695 .relation(
3696 "product",
3697 GraphNode::new("Product").value("id", 100_u64).reference(),
3698 ),
3699 );
3700
3701 let saved = repo.save_graph(graph).unwrap();
3702 assert_eq!(
3703 saved.relations["lines"][1].relations["product"][0]
3704 .values
3705 .get("name"),
3706 Some(&Value::Text("reference-only".to_owned()))
3707 );
3708
3709 let removed = sqlx::query("SELECT version FROM orderline WHERE id = 10")
3710 .fetch_one(&pool)
3711 .await
3712 .unwrap();
3713 assert_eq!(removed.try_get::<i64, _>("version").unwrap(), -2);
3714
3715 let kept = sqlx::query("SELECT version, name FROM orderline WHERE id = 11")
3716 .fetch_one(&pool)
3717 .await
3718 .unwrap();
3719 assert_eq!(kept.try_get::<i64, _>("version").unwrap(), 1);
3720 assert_eq!(kept.try_get::<String, _>("name").unwrap(), "keep-me");
3721
3722 let added = sqlx::query("SELECT product_id FROM orderline WHERE id = 12")
3723 .fetch_one(&pool)
3724 .await
3725 .unwrap();
3726 assert_eq!(added.try_get::<i64, _>("product_id").unwrap(), 100);
3727
3728 let product = sqlx::query("SELECT name FROM product WHERE id = 100")
3729 .fetch_one(&pool)
3730 .await
3731 .unwrap();
3732 assert_eq!(
3733 product.try_get::<String, _>("name").unwrap(),
3734 "reference-only"
3735 );
3736 }
3737
3738 #[tokio::test(flavor = "multi_thread")]
3739 async fn sqlite_save_graph_rejects_invalid_reference_and_state_transitions() {
3740 use sqlx::sqlite::SqlitePoolOptions;
3741
3742 let pool = SqlitePoolOptions::new()
3743 .max_connections(1)
3744 .connect("sqlite::memory:")
3745 .await
3746 .unwrap();
3747
3748 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3749 let order = entity();
3750 let line = line_entity();
3751 let product = product_entity();
3752 mutation_executor
3753 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3754 .await
3755 .unwrap();
3756
3757 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3758 .execute(&pool)
3759 .await
3760 .unwrap();
3761 sqlx::query("INSERT INTO product (id, name) VALUES (100, 'valid-reference')")
3762 .execute(&pool)
3763 .await
3764 .unwrap();
3765 sqlx::query(
3766 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3767 (10, 1, 1, 100, 'line-10')",
3768 )
3769 .execute(&pool)
3770 .await
3771 .unwrap();
3772
3773 let executor = SqliteSyncExecutor::new(mutation_executor);
3774 let mut ctx = UserContext::new()
3775 .with_metadata(
3776 InMemoryMetadataStore::new()
3777 .with_entity(order)
3778 .with_entity(line)
3779 .with_entity(product),
3780 )
3781 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3782 ctx.insert_resource(SqliteDialect);
3783 ctx.insert_resource(executor);
3784
3785 let repo = ctx
3786 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3787 .unwrap();
3788
3789 let missing_reference = repo.save_graph(
3790 GraphNode::new("Order").value("id", 1_u64).relation(
3791 "lines",
3792 GraphNode::new("OrderLine")
3793 .value("id", 12_u64)
3794 .value("version", 1_i64)
3795 .value("name", "bad-reference")
3796 .relation(
3797 "product",
3798 GraphNode::new("Product").value("id", 999_u64).reference(),
3799 ),
3800 ),
3801 );
3802 assert!(format!("{missing_reference:?}").contains("does not exist"));
3803
3804 let mutable_reference = repo.save_graph(
3805 GraphNode::new("Order").value("id", 1_u64).relation(
3806 "lines",
3807 GraphNode::new("OrderLine")
3808 .value("id", 12_u64)
3809 .value("version", 1_i64)
3810 .value("name", "mutable-reference")
3811 .relation(
3812 "product",
3813 GraphNode::new("Product")
3814 .value("id", 100_u64)
3815 .value("name", "should-not-mutate")
3816 .reference(),
3817 ),
3818 ),
3819 );
3820 assert!(format!("{mutable_reference:?}").contains("cannot carry mutable field"));
3821
3822 let duplicate_child = repo.save_graph(
3823 GraphNode::new("Order")
3824 .value("id", 1_u64)
3825 .relation(
3826 "lines",
3827 GraphNode::new("OrderLine")
3828 .value("id", 10_u64)
3829 .value("version", 1_i64)
3830 .reference(),
3831 )
3832 .relation(
3833 "lines",
3834 GraphNode::new("OrderLine")
3835 .value("id", 10_u64)
3836 .value("version", 1_i64)
3837 .reference(),
3838 ),
3839 );
3840 assert!(format!("{duplicate_child:?}").contains("duplicate child id"));
3841
3842 let reference_version_conflict = repo.save_graph(
3843 GraphNode::new("Order").value("id", 1_u64).relation(
3844 "lines",
3845 GraphNode::new("OrderLine")
3846 .value("id", 10_u64)
3847 .value("version", 2_i64)
3848 .reference(),
3849 ),
3850 );
3851 assert!(format!("{reference_version_conflict:?}").contains("OptimisticLockConflict"));
3852
3853 let remove_with_child = repo.save_graph(
3854 GraphNode::new("Order").value("id", 1_u64).relation(
3855 "lines",
3856 GraphNode::new("OrderLine")
3857 .value("id", 10_u64)
3858 .value("version", 1_i64)
3859 .relation(
3860 "product",
3861 GraphNode::new("Product").value("id", 100_u64).reference(),
3862 )
3863 .remove(),
3864 ),
3865 );
3866 assert!(format!("{remove_with_child:?}").contains("cannot contain child relations"));
3867
3868 let remove = repo.save_graph(GraphNode::new("Order").value("id", 999_u64).remove());
3869 assert!(format!("{remove:?}").contains("does not exist"));
3870 }
3871
3872 #[tokio::test(flavor = "multi_thread")]
3873 async fn sqlite_graph_write_rolls_back_all_batches_on_failure() {
3874 use sqlx::{Row, sqlite::SqlitePoolOptions};
3875
3876 let pool = SqlitePoolOptions::new()
3877 .max_connections(1)
3878 .connect("sqlite::memory:")
3879 .await
3880 .unwrap();
3881
3882 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3883 let order = entity();
3884 let line = line_entity();
3885 let product = product_entity();
3886 mutation_executor
3887 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3888 .await
3889 .unwrap();
3890
3891 let executor = SqliteSyncExecutor::new(mutation_executor.clone());
3892 let mut ctx = UserContext::new()
3893 .with_metadata(
3894 InMemoryMetadataStore::new()
3895 .with_entity(order)
3896 .with_entity(line)
3897 .with_entity(product),
3898 )
3899 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3900 ctx.insert_resource(SqliteDialect);
3901 ctx.insert_resource(executor);
3902
3903 let repo = ctx
3904 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3905 .unwrap();
3906 let error = repo.save_graph(
3907 GraphNode::new("Order")
3908 .value("id", 1_u64)
3909 .value("version", 1_i64)
3910 .value("name", "rollback-root")
3911 .relation(
3912 "lines",
3913 GraphNode::new("OrderLine")
3914 .value("id", 10_u64)
3915 .value("version", 1_i64)
3916 .value("name", "rollback-line")
3917 .value("product_id", 999_u64)
3918 .relation(
3919 "product",
3920 GraphNode::new("Product").value("id", 999_u64).reference(),
3921 ),
3922 ),
3923 );
3924 assert!(format!("{error:?}").contains("does not exist"));
3925
3926 let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3927 .fetch_one(&pool)
3928 .await
3929 .unwrap()
3930 .try_get("count")
3931 .unwrap();
3932 assert_eq!(count, 0);
3933 }
3934
3935 #[tokio::test]
3936 async fn postgres_executor_ensure_schema_roundtrip_when_database_is_available() {
3937 use sqlx::{PgPool, Row};
3938
3939 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
3940 return;
3941 };
3942
3943 let pool = PgPool::connect(&database_url).await.unwrap();
3944 sqlx::query("DROP TABLE IF EXISTS orderline")
3945 .execute(&pool)
3946 .await
3947 .unwrap();
3948 sqlx::query("DROP TABLE IF EXISTS orders")
3949 .execute(&pool)
3950 .await
3951 .unwrap();
3952
3953 let executor = PgMutationExecutor::new(pool.clone());
3954 let dialect = PostgresDialect;
3955 let order = entity();
3956 let line = line_entity();
3957
3958 executor
3959 .ensure_schema(&dialect, &[&order, &line])
3960 .await
3961 .unwrap();
3962
3963 let tables = sqlx::query(
3964 "SELECT table_name
3965 FROM information_schema.tables
3966 WHERE table_schema = current_schema()
3967 AND table_name IN ('orders', 'orderline')
3968 ORDER BY table_name",
3969 )
3970 .fetch_all(&pool)
3971 .await
3972 .unwrap()
3973 .into_iter()
3974 .map(|row| row.try_get::<String, _>("table_name").unwrap())
3975 .collect::<Vec<_>>();
3976 assert_eq!(tables, vec!["orderline".to_owned(), "orders".to_owned()]);
3977
3978 let soundex: String = sqlx::query_scalar("SELECT soundex('Robert')")
3979 .fetch_one(&pool)
3980 .await
3981 .unwrap();
3982 assert_eq!(soundex, "R163");
3983
3984 sqlx::query(
3985 "INSERT INTO orders (id, version, name) VALUES
3986 (1, 1, 'draft'),
3987 (2, 1, 'submitted'),
3988 (3, 1, 'archived')",
3989 )
3990 .execute(&pool)
3991 .await
3992 .unwrap();
3993
3994 let array_bound_query = dialect
3995 .compile_select(
3996 &order,
3997 &SelectQuery::new("Order")
3998 .filter(
3999 Expr::in_large(
4000 "id",
4001 vec![Value::from(1_u64), Value::from(2_u64), Value::from(3_u64)],
4002 )
4003 .and_expr(Expr::not_in_large("name", vec![Value::from("archived")])),
4004 )
4005 .order_asc("id"),
4006 )
4007 .unwrap();
4008 assert_eq!(
4009 array_bound_query.sql,
4010 format!(
4011 "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE ((\"id\" = ANY($1)) AND (\"name\" <> ALL($2))) ORDER BY \"id\" ASC"
4012 )
4013 );
4014 let rows = executor.fetch_all(&array_bound_query).await.unwrap();
4015 assert_eq!(rows.len(), 2);
4016 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4017 assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4018
4019 sqlx::query(
4020 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
4021 (11, 1, 1, 101, 'line-1'),
4022 (12, 1, 2, 102, 'line-2'),
4023 (13, 1, 3, 103, 'archived-line')",
4024 )
4025 .execute(&pool)
4026 .await
4027 .unwrap();
4028
4029 let subquery = dialect
4030 .compile_select(
4031 &order,
4032 &SelectQuery::new("Order")
4033 .filter(Expr::in_subquery(
4034 "id",
4035 line.clone(),
4036 SelectQuery::new("OrderLine").filter(Expr::contain("name", "line-")),
4037 "order_id",
4038 ))
4039 .order_asc("id"),
4040 )
4041 .unwrap();
4042 assert_eq!(
4043 subquery.sql,
4044 format!(
4045 "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"id\" IN (SELECT \"order_id\" FROM \"orderline\" WHERE (\"name\" LIKE $1))) ORDER BY \"id\" ASC"
4046 )
4047 );
4048 let rows = executor.fetch_all(&subquery).await.unwrap();
4049 assert_eq!(rows.len(), 2);
4050 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4051 assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4052
4053 let projected = dialect
4054 .compile_select(
4055 &order,
4056 &SelectQuery::new("Order")
4057 .project("id")
4058 .project_expr("nameSound", Expr::soundex(Expr::column("name")))
4059 .order_gbk_asc("name")
4060 .limit(1),
4061 )
4062 .unwrap();
4063 assert_eq!(
4064 projected.sql,
4065 "SELECT \"id\", SOUNDEX(\"name\") AS \"nameSound\" FROM \"orders\" ORDER BY convert_to(\"name\", 'GBK') ASC LIMIT 1"
4066 );
4067 let rows = executor.fetch_all(&projected).await.unwrap();
4068 assert_eq!(rows.len(), 1);
4069 assert!(rows[0].contains_key("nameSound"));
4070
4071 let aggregate = dialect
4072 .compile_select(
4073 &order,
4074 &SelectQuery::new("Order")
4075 .count("total")
4076 .stddev("version", "stddevVersion")
4077 .var_pop("version", "varPopVersion")
4078 .bit_or("version", "bitOrVersion")
4079 .having(Expr::binary(
4080 Expr::count_all(),
4081 teaql_core::BinaryOp::Gt,
4082 Expr::value(2_i64),
4083 )),
4084 )
4085 .unwrap();
4086 assert_eq!(
4087 aggregate.sql,
4088 "SELECT COUNT(*) AS \"total\", STDDEV(\"version\") AS \"stddevVersion\", VAR_POP(\"version\") AS \"varPopVersion\", BIT_OR(\"version\") AS \"bitOrVersion\" FROM \"orders\" HAVING (COUNT(*) > $1)"
4089 );
4090 let rows = executor.fetch_all(&aggregate).await.unwrap();
4091 assert_eq!(rows.len(), 1);
4092 assert_eq!(rows[0].get("total"), Some(&Value::I64(3)));
4093 assert!(matches!(
4094 rows[0].get("stddevVersion"),
4095 Some(Value::Decimal(_))
4096 ));
4097 assert!(matches!(
4098 rows[0].get("varPopVersion"),
4099 Some(Value::Decimal(_))
4100 ));
4101 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(1)));
4102
4103 sqlx::query("DROP TABLE orderline")
4104 .execute(&pool)
4105 .await
4106 .unwrap();
4107 sqlx::query("CREATE TABLE orderline (id BIGINT PRIMARY KEY)")
4108 .execute(&pool)
4109 .await
4110 .unwrap();
4111
4112 executor.ensure_schema(&dialect, &[&line]).await.unwrap();
4113
4114 let columns = sqlx::query(
4115 "SELECT column_name
4116 FROM information_schema.columns
4117 WHERE table_schema = current_schema()
4118 AND table_name = 'orderline'
4119 ORDER BY column_name",
4120 )
4121 .fetch_all(&pool)
4122 .await
4123 .unwrap()
4124 .into_iter()
4125 .map(|row| row.try_get::<String, _>("column_name").unwrap())
4126 .collect::<Vec<_>>();
4127 assert!(columns.contains(&"id".to_owned()));
4128 assert!(columns.contains(&"order_id".to_owned()));
4129 assert!(columns.contains(&"product_id".to_owned()));
4130 assert!(columns.contains(&"name".to_owned()));
4131
4132 sqlx::query("DROP TABLE IF EXISTS orderline")
4133 .execute(&pool)
4134 .await
4135 .unwrap();
4136 sqlx::query("DROP TABLE IF EXISTS orders")
4137 .execute(&pool)
4138 .await
4139 .unwrap();
4140 }
4141
4142 #[tokio::test(flavor = "multi_thread")]
4143 async fn postgres_id_space_generator_prepares_repository_insert_when_database_is_available() {
4144 use sqlx::{PgPool, Row};
4145
4146 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4147 return;
4148 };
4149
4150 let pool = PgPool::connect(&database_url).await.unwrap();
4151 sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4152 .execute(&pool)
4153 .await
4154 .unwrap();
4155 sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4156 .execute(&pool)
4157 .await
4158 .unwrap();
4159
4160 let order = entity().table_name("orders_idgen");
4161 let executor = PgMutationExecutor::new(pool.clone());
4162 executor
4163 .ensure_schema(&PostgresDialect, &[&order])
4164 .await
4165 .unwrap();
4166
4167 let mut ctx = UserContext::new()
4168 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
4169 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
4170 .with_internal_id_generator(PgIdSpaceGenerator::new(pool.clone()));
4171 ctx.insert_resource(PostgresDialect);
4172 ctx.insert_resource(PgSyncExecutor::new(executor));
4173
4174 let repo = ctx
4175 .resolve_repository::<PostgresDialect, PgSyncExecutor>("Order")
4176 .unwrap();
4177 repo.insert(
4178 &repo
4179 .insert_command()
4180 .value("version", 1_i64)
4181 .value("name", "generated-1"),
4182 )
4183 .unwrap();
4184 repo.insert(
4185 &repo
4186 .insert_command()
4187 .value("version", 1_i64)
4188 .value("name", "generated-2"),
4189 )
4190 .unwrap();
4191
4192 let ids = sqlx::query("SELECT id FROM orders_idgen ORDER BY id")
4193 .fetch_all(&pool)
4194 .await
4195 .unwrap()
4196 .into_iter()
4197 .map(|row| row.try_get::<i64, _>("id").unwrap())
4198 .collect::<Vec<_>>();
4199 assert_eq!(ids, vec![1, 2]);
4200
4201 let current: i64 = sqlx::query_scalar(
4202 "SELECT current_level FROM teaql_id_space WHERE type_name = 'Order'",
4203 )
4204 .fetch_one(&pool)
4205 .await
4206 .unwrap();
4207 assert_eq!(current, 2);
4208
4209 sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4210 .execute(&pool)
4211 .await
4212 .unwrap();
4213 sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4214 .execute(&pool)
4215 .await
4216 .unwrap();
4217 }
4218
4219 #[tokio::test]
4220 async fn postgres_executor_runs_extended_aggregates_when_database_is_available() {
4221 use sqlx::PgPool;
4222
4223 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4224 return;
4225 };
4226
4227 let pool = PgPool::connect(&database_url).await.unwrap();
4228 sqlx::query("DROP TABLE IF EXISTS orders_agg")
4229 .execute(&pool)
4230 .await
4231 .unwrap();
4232
4233 let order = entity().table_name("orders_agg");
4234 let executor = PgMutationExecutor::new(pool.clone());
4235 let dialect = PostgresDialect;
4236 executor.ensure_schema(&dialect, &[&order]).await.unwrap();
4237
4238 sqlx::query(
4239 "INSERT INTO orders_agg (id, version, name) VALUES
4240 (1, 1, 'A'),
4241 (2, 2, 'A'),
4242 (3, 3, 'B'),
4243 (4, 4, 'B')",
4244 )
4245 .execute(&pool)
4246 .await
4247 .unwrap();
4248
4249 let aggregate = dialect
4250 .compile_select(
4251 &order,
4252 &SelectQuery::new("Order")
4253 .count("total")
4254 .count_field("version", "versionCount")
4255 .sum("version", "sumVersion")
4256 .avg("version", "avgVersion")
4257 .min("version", "minVersion")
4258 .max("version", "maxVersion")
4259 .stddev("version", "stddevVersion")
4260 .stddev_pop("version", "stddevPopVersion")
4261 .var_samp("version", "varSampVersion")
4262 .var_pop("version", "varPopVersion")
4263 .bit_and("version", "bitAndVersion")
4264 .bit_or("version", "bitOrVersion")
4265 .bit_xor("version", "bitXorVersion")
4266 .having(Expr::binary(
4267 Expr::count_all(),
4268 teaql_core::BinaryOp::Gt,
4269 Expr::value(3_i64),
4270 )),
4271 )
4272 .unwrap();
4273 let rows = executor.fetch_all(&aggregate).await.unwrap();
4274 assert_eq!(rows.len(), 1);
4275 let row = &rows[0];
4276 assert_eq!(row.get("total"), Some(&Value::I64(4)));
4277 assert_eq!(row.get("versionCount"), Some(&Value::I64(4)));
4278 assert_eq!(
4279 row.get("sumVersion"),
4280 Some(&Value::Decimal(Decimal::new(10, 0)))
4281 );
4282 assert_eq!(
4283 row.get("avgVersion"),
4284 Some(&Value::Decimal(Decimal::new(25, 1)))
4285 );
4286 assert_eq!(row.get("minVersion"), Some(&Value::I64(1)));
4287 assert_eq!(row.get("maxVersion"), Some(&Value::I64(4)));
4288 assert!(matches!(row.get("stddevVersion"), Some(Value::Decimal(_))));
4289 assert!(matches!(
4290 row.get("stddevPopVersion"),
4291 Some(Value::Decimal(_))
4292 ));
4293 assert!(matches!(row.get("varSampVersion"), Some(Value::Decimal(_))));
4294 assert!(matches!(row.get("varPopVersion"), Some(Value::Decimal(_))));
4295 assert_eq!(row.get("bitAndVersion"), Some(&Value::I64(0)));
4296 assert_eq!(row.get("bitOrVersion"), Some(&Value::I64(7)));
4297 assert_eq!(row.get("bitXorVersion"), Some(&Value::I64(4)));
4298
4299 let grouped = dialect
4300 .compile_select(
4301 &order,
4302 &SelectQuery::new("Order")
4303 .group_by("name")
4304 .count("total")
4305 .sum("version", "sumVersion")
4306 .having(Expr::binary(
4307 Expr::count_all(),
4308 teaql_core::BinaryOp::Gt,
4309 Expr::value(1_i64),
4310 ))
4311 .order_asc("name"),
4312 )
4313 .unwrap();
4314 let rows = executor.fetch_all(&grouped).await.unwrap();
4315 assert_eq!(rows.len(), 2);
4316 assert_eq!(rows[0].get("name"), Some(&Value::Text("A".to_owned())));
4317 assert_eq!(rows[0].get("total"), Some(&Value::I64(2)));
4318 assert_eq!(
4319 rows[0].get("sumVersion"),
4320 Some(&Value::Decimal(Decimal::new(3, 0)))
4321 );
4322 assert_eq!(rows[1].get("name"), Some(&Value::Text("B".to_owned())));
4323 assert_eq!(rows[1].get("total"), Some(&Value::I64(2)));
4324 assert_eq!(
4325 rows[1].get("sumVersion"),
4326 Some(&Value::Decimal(Decimal::new(7, 0)))
4327 );
4328
4329 sqlx::query("DROP TABLE IF EXISTS orders_agg")
4330 .execute(&pool)
4331 .await
4332 .unwrap();
4333 }
4334
4335 #[tokio::test]
4336 async fn user_context_can_ensure_postgres_schema_when_database_is_available() {
4337 use sqlx::{PgPool, Row};
4338
4339 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4340 return;
4341 };
4342
4343 let pool = PgPool::connect(&database_url).await.unwrap();
4344 sqlx::query("DROP TABLE IF EXISTS product_ctx")
4345 .execute(&pool)
4346 .await
4347 .unwrap();
4348 sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4349 .execute(&pool)
4350 .await
4351 .unwrap();
4352 sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4353 .execute(&pool)
4354 .await
4355 .unwrap();
4356
4357 let order = entity().table_name("orders_ctx");
4358 let line = line_entity().table_name("orderline_ctx");
4359 let product = product_entity().table_name("product_ctx");
4360 let module = super::RuntimeModule::new()
4361 .descriptor(order)
4362 .descriptor(line)
4363 .descriptor(product);
4364 let mut ctx = super::UserContext::new().with_module(module);
4365 ctx.insert_resource(PostgresDialect);
4366 ctx.insert_resource(PgMutationExecutor::new(pool.clone()));
4367
4368 ctx.ensure_postgres_schema().await.unwrap();
4369
4370 let tables = sqlx::query(
4371 "SELECT table_name
4372 FROM information_schema.tables
4373 WHERE table_schema = current_schema()
4374 AND table_name IN ('orders_ctx', 'orderline_ctx', 'product_ctx')
4375 ORDER BY table_name",
4376 )
4377 .fetch_all(&pool)
4378 .await
4379 .unwrap()
4380 .into_iter()
4381 .map(|row| row.try_get::<String, _>("table_name").unwrap())
4382 .collect::<Vec<_>>();
4383
4384 assert_eq!(
4385 tables,
4386 vec![
4387 "orderline_ctx".to_owned(),
4388 "orders_ctx".to_owned(),
4389 "product_ctx".to_owned()
4390 ]
4391 );
4392
4393 sqlx::query("DROP TABLE IF EXISTS product_ctx")
4394 .execute(&pool)
4395 .await
4396 .unwrap();
4397 sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4398 .execute(&pool)
4399 .await
4400 .unwrap();
4401 sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4402 .execute(&pool)
4403 .await
4404 .unwrap();
4405 }
4406
4407 #[tokio::test(flavor = "multi_thread")]
4408 async fn postgres_graph_write_transaction_rolls_back_when_database_is_available() {
4409 use sqlx::{PgPool, Row};
4410
4411 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4412 return;
4413 };
4414
4415 let pool = PgPool::connect(&database_url).await.unwrap();
4416 sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4417 .execute(&pool)
4418 .await
4419 .unwrap();
4420 sqlx::query("DROP TABLE IF EXISTS product_tx")
4421 .execute(&pool)
4422 .await
4423 .unwrap();
4424 sqlx::query("DROP TABLE IF EXISTS orders_tx")
4425 .execute(&pool)
4426 .await
4427 .unwrap();
4428
4429 let order = entity().table_name("orders_tx");
4430 let line = line_entity().table_name("orderline_tx");
4431 let product = product_entity().table_name("product_tx");
4432 let schema_executor = PgMutationExecutor::new(pool.clone());
4433 schema_executor
4434 .ensure_schema(&PostgresDialect, &[&order, &line, &product])
4435 .await
4436 .unwrap();
4437
4438 let tx_executor = PgTransactionExecutor::begin(&pool).await.unwrap();
4439 let sync_executor = PgTxSyncExecutor::new(tx_executor.clone());
4440 let mut ctx = UserContext::new()
4441 .with_metadata(
4442 InMemoryMetadataStore::new()
4443 .with_entity(order)
4444 .with_entity(line)
4445 .with_entity(product),
4446 )
4447 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
4448 ctx.insert_resource(PostgresDialect);
4449 ctx.insert_resource(sync_executor);
4450
4451 let repo = ctx
4452 .resolve_repository::<PostgresDialect, PgTxSyncExecutor>("Order")
4453 .unwrap();
4454 repo.save_graph(
4455 GraphNode::new("Order")
4456 .value("id", 1_u64)
4457 .value("version", 1_i64)
4458 .value("name", "pg-rollback-root")
4459 .relation(
4460 "lines",
4461 GraphNode::new("OrderLine")
4462 .value("id", 10_u64)
4463 .value("version", 1_i64)
4464 .value("name", "pg-rollback-line")
4465 .relation(
4466 "product",
4467 GraphNode::new("Product")
4468 .value("id", 100_u64)
4469 .value("name", "pg-rollback-sku"),
4470 ),
4471 ),
4472 )
4473 .unwrap();
4474
4475 tx_executor.rollback().await.unwrap();
4476
4477 let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders_tx")
4478 .fetch_one(&pool)
4479 .await
4480 .unwrap()
4481 .try_get("count")
4482 .unwrap();
4483 assert_eq!(count, 0);
4484
4485 sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4486 .execute(&pool)
4487 .await
4488 .unwrap();
4489 sqlx::query("DROP TABLE IF EXISTS product_tx")
4490 .execute(&pool)
4491 .await
4492 .unwrap();
4493 sqlx::query("DROP TABLE IF EXISTS orders_tx")
4494 .execute(&pool)
4495 .await
4496 .unwrap();
4497 }
4498}
4499pub use checker::{
4500 CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
4501 CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, clear_record_status,
4502 mark_record_status,
4503};