1use std::sync::Arc;
2
3use teaql_core::{
4 AggregationCacheOptions, DeleteCommand, Entity, InsertCommand, Record, RecoverCommand,
5 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
6};
7
8use crate::{
9 CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
10 clear_record_status, mark_record_status,
11};
12
13use super::{
14 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15 ResolvedRepository, UserContextMetadata, helpers::*,
16};
17
18impl<'a, E> ResolvedRepository<'a, E>
19where
20 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
21{
22 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
23 self.repository.metadata.context.repository_behavior(entity)
24 }
25
26 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
27 self.repository
28 .metadata
29 .context
30 .repository_behavior(&self.entity)
31 }
32
33 pub fn entity(&self) -> &str {
34 &self.entity
35 }
36
37 pub fn select(&self) -> SelectQuery {
38 SelectQuery::new(self.entity.clone())
39 }
40
41 pub fn insert_command(&self) -> InsertCommand {
42 InsertCommand::new(self.entity.clone())
43 }
44
45 fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
46 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
47 policy.enforce_insert(self.repository.metadata.context, command)?;
48 }
49 Ok(())
50 }
51
52 fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
53 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
54 policy.enforce_update(self.repository.metadata.context, command)?;
55 }
56 Ok(())
57 }
58
59 fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
60 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
61 policy.enforce_delete(self.repository.metadata.context, command)?;
62 }
63 Ok(())
64 }
65
66 fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
67 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
68 policy.enforce_recover(self.repository.metadata.context, command)?;
69 }
70 Ok(())
71 }
72
73 fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
74 let mut query = query.clone();
75
76 let mut full_trace = self.trace_context.clone();
77 full_trace.extend(query.trace_chain);
78 query.trace_chain = full_trace;
79
80 if let Some(behavior) = self.query_behavior(&query.entity) {
81 behavior.before_select(self.repository.metadata.context, &mut query)?;
82 }
83 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
84 policy.enforce_select(self.repository.metadata.context, &mut query)?;
85 }
86 Ok(query)
87 }
88
89 pub fn prepare_insert_command(
90 &self,
91 command: &InsertCommand,
92 ) -> Result<InsertCommand, RuntimeError> {
93 let mut command = command.clone();
94 if let Some(behavior) = self.behavior() {
95 behavior.before_insert(self.repository.metadata.context, &mut command)?;
96 }
97 self.enforce_insert_policy(&mut command)?;
98
99 let entity = self
100 .repository
101 .metadata
102 .context
103 .require_entity(&command.entity)?;
104 if let Some(id_property) = entity.id_property() {
105 let needs_id = !command.values.contains_key(&id_property.name)
106 || is_unassigned_id(command.values.get(&id_property.name));
107 if needs_id {
108 let id = self.repository.metadata.context.next_id(&command.entity)?;
109 command
110 .values
111 .insert(id_property.name.clone(), Value::U64(id));
112 }
113 }
114 ensure_initial_version(&mut command.values, entity);
115 mark_record_status(&mut command.values, CheckObjectStatus::Create);
116 let check_result = self
117 .repository
118 .metadata
119 .context
120 .check_and_fix_record(&command.entity, &mut command.values);
121 clear_record_status(&mut command.values);
122 check_result?;
123
124 Ok(command)
125 }
126
127 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
128 UpdateCommand::new(self.entity.clone(), id)
129 }
130
131 pub fn prepare_update_command(
132 &self,
133 command: &UpdateCommand,
134 ) -> Result<UpdateCommand, RuntimeError> {
135 let mut command = command.clone();
136 if let Some(behavior) = self.behavior() {
137 behavior.before_update(self.repository.metadata.context, &mut command)?;
138 }
139 self.enforce_update_policy(&mut command)?;
140
141 Ok(command)
142 }
143
144 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
145 DeleteCommand::new(self.entity.clone(), id)
146 }
147
148 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
149 RecoverCommand::new(self.entity.clone(), id, expected_version)
150 }
151
152 pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
153 let query = self
154 .prepare_select_query(query)
155 .map_err(RepositoryError::Runtime)?;
156 self.fetch_prepared_all(&query).await
157 }
158
159 async fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
160 let mut rows = self.fetch_prepared_query(query).await?;
161 self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain).await?;
162 self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain).await?;
163 self.enhance_query_relations(&mut rows, query).await?;
164 Ok(rows)
165 }
166
167 async fn fetch_prepared_query(
168 &self,
169 query: &SelectQuery,
170 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
171
172 let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
173 let mut query = query.clone();
174 query.comment = final_comment;
175 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
176 if let Some(cache) = self
177 .repository
178 .metadata
179 .context
180 .get_resource::<Arc<dyn AggregationCacheBackend>>()
181 {
182 return self.fetch_prepared_query_with_cache(
183 &query,
184 options,
185 cache.as_ref(),
186 ).await;
187 }
188 if let Some(cache) = self
189 .repository
190 .metadata
191 .context
192 .get_resource::<InMemoryAggregationCache>()
193 {
194 return self.fetch_prepared_query_with_cache(&query, options, cache).await;
195 }
196 }
197 let request = teaql_data_service::QueryRequest {
198 query: query.clone(),
199 trace_chain: query.trace_chain.clone(),
200 comment: query.comment.clone(),
201 };
202 let res = self
203 .repository
204 .executor
205 .query(request)
206 .await.map_err(RepositoryError::Executor)?;
207 self.repository.metadata.context.record_metadata_log(&res.metadata);
208 Ok(res.rows)
209 }
210
211 async fn fetch_prepared_query_with_cache(
212 &self,
213 query: &SelectQuery,
214 options: AggregationCacheOptions,
215 cache: &dyn AggregationCacheBackend,
216 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
217
218 let key = aggregation_cache_key(
219 cache.namespace(),
220 &aggregation_cache_namespace(&query.entity),
221 query,
222 );
223 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
224 return Ok(rows);
225 }
226 let request = teaql_data_service::QueryRequest {
227 query: query.clone(),
228 trace_chain: query.trace_chain.clone(),
229 comment: query.comment.clone(),
230 };
231 let res = self
232 .repository
233 .executor
234 .query(request)
235 .await.map_err(RepositoryError::Executor)?;
236 self.repository.metadata.context.record_metadata_log(&res.metadata);
237 let rows = res.rows;
238 cache.put(key, rows.clone());
239 Ok(rows)
240 }
241
242 pub async fn fetch_all_with_relation_aggregates(
243 &self,
244 query: &SelectQuery,
245 relation_aggregates: &[RelationAggregate],
246 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
247 let query = self
248 .prepare_select_query(query)
249 .map_err(RepositoryError::Runtime)?;
250
251 let mut rows = self.fetch_prepared_all(&query).await?;
252 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
253 Ok(rows)
254 }
255
256 pub async fn fetch_smart_list(
257 &self,
258 query: &SelectQuery,
259 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
260 let query = self
261 .prepare_select_query(query)
262 .map_err(RepositoryError::Runtime)?;
263
264 self.repository.fetch_smart_list(&query).await
265 }
266
267 pub async fn fetch_smart_list_with_relation_aggregates(
268 &self,
269 query: &SelectQuery,
270 relation_aggregates: &[RelationAggregate],
271 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
272 self.fetch_all_with_relation_aggregates(query, relation_aggregates).await
273 .map(SmartList::from)
274 }
275
276 pub async fn fetch_entities<T>(
277 &self,
278 query: &SelectQuery,
279 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
280 where
281 T: Entity,
282 {
283 let query = self
284 .prepare_select_query(query)
285 .map_err(RepositoryError::Runtime)?;
286
287 self.repository.fetch_entities(&query).await
288 }
289
290 pub async fn fetch_entities_with_relation_aggregates<T>(
291 &self,
292 query: &SelectQuery,
293 relation_aggregates: &[RelationAggregate],
294 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
295 where
296 T: Entity,
297 {
298 self.fetch_all_with_relation_aggregates(query, relation_aggregates).await?
299 .into_iter()
300 .map(|record| {
301 let mut entity = T::from_record(record)?;
302 let root = crate::EntityRoot::default();
303 entity.on_loaded(&root as &dyn std::any::Any);
304 Ok(entity)
305 })
306 .collect::<Result<Vec<_>, _>>()
307 .map(SmartList::from)
308 .map_err(RepositoryError::Entity)
309 }
310
311 pub async fn fetch_enhanced_entities_with_relation_aggregates<T>(
312 &self,
313 query: &SelectQuery,
314 relation_aggregates: &[RelationAggregate],
315 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
316 where
317 T: Entity,
318 {
319 let query = self
320 .prepare_select_query(query)
321 .map_err(RepositoryError::Runtime)?;
322
323 let mut rows = self.fetch_prepared_all(&query).await?;
324 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
325 self.enhance_query_relations(&mut rows, &query).await?;
326 self.enhance_relations(&mut rows).await?;
327 rows.into_iter()
328 .map(|record| {
329 let mut entity = T::from_record(record)?;
330 let root = crate::EntityRoot::default();
331 entity.on_loaded(&root as &dyn std::any::Any);
332 Ok(entity)
333 })
334 .collect::<Result<Vec<_>, _>>()
335 .map(SmartList::from)
336 .map_err(RepositoryError::Entity)
337 }
338
339 pub async fn fetch_enhanced_entities<T>(
340 &self,
341 query: &SelectQuery,
342 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
343 where
344 T: Entity,
345 {
346 let query = self
347 .prepare_select_query(query)
348 .map_err(RepositoryError::Runtime)?;
349
350 let mut rows = self.fetch_prepared_all(&query).await?;
351 self.enhance_query_relations(&mut rows, &query).await?;
352 self.enhance_relations(&mut rows).await?;
353 let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
354
355 rows.into_iter()
356 .map(|record| {
357 let mut entity = T::from_record(record)?;
358 if let Some(ref root) = root {
359 entity.on_loaded(root as &dyn std::any::Any);
360 }
361 Ok(entity)
362 })
363 .collect::<Result<Vec<_>, _>>()
364 .map(SmartList::from)
365 .map_err(RepositoryError::Entity)
366 }
367
368 pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
369 let mut command = command.clone();
370 if let Some(behavior) = self.behavior() {
371 behavior
372 .before_insert(self.repository.metadata.context, &mut command)
373 .map_err(RepositoryError::Runtime)?;
374 }
375 self.enforce_insert_policy(&mut command).map_err(RepositoryError::Runtime)?;
376 self.execute_prepared_insert_with_comment(command, self.trace_context.clone()).await
377 }
378
379 pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
380 let mut command = command.clone();
381 if let Some(behavior) = self.behavior() {
382 behavior
383 .before_update(self.repository.metadata.context, &mut command)
384 .map_err(RepositoryError::Runtime)?;
385 }
386 self.enforce_update_policy(&mut command).map_err(RepositoryError::Runtime)?;
387 self.execute_prepared_update_with_comment(command, self.trace_context.clone()).await
388 }
389
390 pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
391 self.delete_scoped(command, self.trace_context.clone()).await
392 }
393
394 pub async fn delete_scoped(
395 &self,
396 command: &DeleteCommand,
397 trace_chain: Vec<teaql_core::TraceNode>,
398 ) -> Result<u64, RepositoryError<E::Error>> {
399 let mut command = command.clone();
400 command.trace_chain = trace_chain.clone();
401 if let Some(behavior) = self.behavior() {
402 behavior
403 .before_delete(self.repository.metadata.context, &mut command)
404 .map_err(RepositoryError::Runtime)?;
405 }
406 self.enforce_delete_policy(&mut command)
407 .map_err(RepositoryError::Runtime)?;
408
409 let old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
410 let affected = self.repository.delete(&command).await?;
411
412 let mut event = EntityEvent::deleted_with_old_values(
413 command.entity,
414 command.id,
415 command.expected_version,
416 old_values,
417 );
418 event.trace_chain = trace_chain;
419 self.emit_event(event)
420 .map_err(RepositoryError::Runtime)?;
421 Ok(affected)
422 }
423
424 pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
425 let mut command = command.clone();
426 command.trace_chain = self.trace_context.clone();
427 if let Some(behavior) = self.behavior() {
428 behavior
429 .before_recover(self.repository.metadata.context, &mut command)
430 .map_err(RepositoryError::Runtime)?;
431 }
432 self.enforce_recover_policy(&mut command)
433 .map_err(RepositoryError::Runtime)?;
434 let old_values = self.fetch_current_event_row(&command.entity, &command.id, command.trace_chain.clone())?;
435 let affected = self.repository.recover(&command).await?;
436 let event = EntityEvent::recovered_with_old_values(
437 command.entity,
438 command.id,
439 command.expected_version,
440 old_values,
441 );
442 self.emit_event(event)
443 .map_err(RepositoryError::Runtime)?;
444 Ok(affected)
445 }
446
447 fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
448 self.repository.metadata.context.send_event(event)
449 }
450
451 #[allow(dead_code)]
452 pub(super) async fn execute_prepared_insert(
453 &self,
454 command: InsertCommand,
455 ) -> Result<u64, RepositoryError<E::Error>> {
456 self.execute_prepared_insert_with_comment(command, Vec::new()).await
457 }
458
459 pub(super) async fn execute_prepared_insert_with_comment(
460 &self,
461 mut command: InsertCommand,
462 trace_chain: Vec<teaql_core::TraceNode>,
463 ) -> Result<u64, RepositoryError<E::Error>> {
464 command.trace_chain = trace_chain.clone();
465 let affected = self.repository.insert(&command).await?;
466 let mut event = EntityEvent::created(command.entity, command.values);
467 event.trace_chain = trace_chain;
468 self.emit_event(event).map_err(RepositoryError::Runtime)?;
469 Ok(affected)
470 }
471
472 pub(super) async fn execute_prepared_batch_insert(
473 &self,
474 command: teaql_core::BatchInsertCommand,
475 ) -> Result<u64, RepositoryError<E::Error>> {
476 if command.batch_values.is_empty() {
477 return Ok(0);
478 }
479 let affected = self.repository.batch_insert(&command).await?;
480
481 let entity = command.entity.clone();
482 for (i, values) in command.batch_values.into_iter().enumerate() {
483 let mut event = EntityEvent::created(entity.clone(), values);
484 if i < command.trace_chains.len() {
485 event.trace_chain = command.trace_chains[i].clone();
486 }
487 self.emit_event(event).map_err(RepositoryError::Runtime)?;
488 }
489 Ok(affected)
490 }
491
492 #[allow(dead_code)]
493 pub(super) async fn execute_prepared_update(
494 &self,
495 command: UpdateCommand,
496 ) -> Result<u64, RepositoryError<E::Error>> {
497 self.execute_prepared_update_with_comment(command, Vec::new()).await
498 }
499
500 pub(super) async fn execute_prepared_update_with_comment(
501 &self,
502 mut command: UpdateCommand,
503 trace_chain: Vec<teaql_core::TraceNode>,
504 ) -> Result<u64, RepositoryError<E::Error>> {
505 command.trace_chain = trace_chain.clone();
506
507 let mut old_values = command.old_values.clone();
508 let needs_fetch = match &old_values {
509 Some(snapshot) => !command.values.keys().all(|k| snapshot.contains_key(k)),
510 None => true,
511 };
512 if needs_fetch {
513 old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
514 }
515
516 let affected = self.repository.update(&command).await?;
517 let updated_fields = command.values.keys().cloned().collect();
518 let mut values = command.values.clone();
519 values.insert("id".to_owned(), command.id.clone());
520 if let Some(version) = command.expected_version {
521 values.insert("version".to_owned(), Value::I64(version + 1));
522 }
523 let mut new_values = old_values.clone().unwrap_or_default();
524 for (field, value) in &values {
525 new_values.insert(field.clone(), value.clone());
526 }
527 let mut event = EntityEvent::updated_with_old_values(
528 command.entity,
529 values,
530 old_values,
531 new_values,
532 updated_fields,
533 );
534 event.trace_chain = trace_chain;
535 self.emit_event(event).map_err(RepositoryError::Runtime)?;
536 Ok(affected)
537 }
538
539 pub(super) async fn execute_prepared_batch_update(
540 &self,
541 command: teaql_core::BatchUpdateCommand,
542 ) -> Result<u64, RepositoryError<E::Error>> {
543 if command.batch_values.is_empty() {
544 return Ok(0);
545 }
546 let affected = self.repository.batch_update(&command).await?;
547
548 let entity = command.entity.clone();
549 for (i, values) in command.batch_values.into_iter().enumerate() {
550 let mut full_values = values.clone();
551 full_values.insert("id".to_owned(), command.batch_ids[i].clone());
552 if let Some(Some(version)) = command.batch_expected_versions.get(i) {
553 full_values.insert("version".to_owned(), teaql_core::Value::I64(*version + 1));
554 }
555
556 let old_values = command.batch_old_values.get(i).cloned().unwrap_or(None);
557 let mut new_values = old_values.clone().unwrap_or_default();
558 for (field, value) in &full_values {
559 new_values.insert(field.clone(), value.clone());
560 }
561
562 let mut event = EntityEvent::updated_with_old_values(
563 entity.clone(),
564 full_values,
565 old_values,
566 new_values,
567 command.update_fields.clone(),
568 );
569 if i < command.trace_chains.len() {
570 event.trace_chain = command.trace_chains[i].clone();
571 }
572 self.emit_event(event).map_err(RepositoryError::Runtime)?;
573 }
574 Ok(affected)
575 }
576
577 fn fetch_current_event_row(
578 &self,
579 _entity: &str,
580 _id: &Value,
581 _trace_chain: Vec<teaql_core::TraceNode>,
582 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
583 Ok(None)
586 }
587
588
589 pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, E> {
590 ResolvedRepository {
591 entity,
592 repository: ContextRepository {
593 metadata: UserContextMetadata {
594 context: self.repository.metadata.context,
595 },
596 executor: self.repository.executor,
597 },
598 trace_context: Vec::new(),
599 }
600 }
601}