1use std::sync::Arc;
2
3use teaql_core::{
4 AggregationCacheOptions, DeleteCommand, Entity, InsertCommand, Record, RecoverCommand,
5 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
6};
7
8use crate::{
9 CheckObjectStatus, RawAuditEvent, RepositoryBehavior, RepositoryError, RuntimeError,
10 clear_record_status, mark_record_status,
11};
12
13use super::{
14 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15 ResolvedRepository, UserContextMetadata, helpers::*,
16};
17
18impl<'a, E> ResolvedRepository<'a, E>
19where
20 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
21{
22 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
23 self.repository.metadata.context.repository_behavior(entity)
24 }
25
26 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
27 self.repository
28 .metadata
29 .context
30 .repository_behavior(&self.entity)
31 }
32
33 pub fn entity(&self) -> &str {
34 &self.entity
35 }
36
37 pub fn select(&self) -> SelectQuery {
38 SelectQuery::new(self.entity.clone())
39 }
40
41 pub fn insert_command(&self) -> InsertCommand {
42 InsertCommand::new(self.entity.clone())
43 }
44
45 fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
46 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
47 policy.enforce_insert(self.repository.metadata.context, command)?;
48 }
49 Ok(())
50 }
51
52 fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
53 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
54 policy.enforce_update(self.repository.metadata.context, command)?;
55 }
56 Ok(())
57 }
58
59 fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
60 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
61 policy.enforce_delete(self.repository.metadata.context, command)?;
62 }
63 Ok(())
64 }
65
66 fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
67 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
68 policy.enforce_recover(self.repository.metadata.context, command)?;
69 }
70 Ok(())
71 }
72
73 fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
74 let mut query = query.clone();
75
76 let mut full_trace = self.trace_context.clone();
77 full_trace.extend(query.trace_chain);
78 query.trace_chain = full_trace;
79
80 if let Some(behavior) = self.query_behavior(&query.entity) {
81 behavior.before_select(self.repository.metadata.context, &mut query)?;
82 }
83 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
84 policy.enforce_select(self.repository.metadata.context, &mut query)?;
85 }
86 if !query.relations.is_empty() {
89 if let Some(descriptor) = self.repository.metadata.context.entity(&query.entity) {
90 for load in &query.relations {
91 if let Some(relation) = descriptor.relation_by_name(&load.name) {
92 if !query.projection.contains(&relation.local_key) {
93 query.projection.push(relation.local_key.clone());
94 }
95 }
96 }
97 }
98 }
99 Ok(query)
100 }
101
102 pub fn prepare_insert_command(
103 &self,
104 command: &InsertCommand,
105 ) -> Result<InsertCommand, RuntimeError> {
106 let mut command = command.clone();
107 if let Some(behavior) = self.behavior() {
108 behavior.before_insert(self.repository.metadata.context, &mut command)?;
109 }
110 self.enforce_insert_policy(&mut command)?;
111
112 let entity = self
113 .repository
114 .metadata
115 .context
116 .require_entity(&command.entity)?;
117 if let Some(id_property) = entity.id_property() {
118 let needs_id = !command.values.contains_key(&id_property.name)
119 || is_unassigned_id(command.values.get(&id_property.name));
120 if needs_id {
121 let id = self.repository.metadata.context.next_id(&command.entity)?;
122 command
123 .values
124 .insert(id_property.name.clone(), Value::U64(id));
125 }
126 }
127 ensure_initial_version(&mut command.values, entity);
128 mark_record_status(&mut command.values, CheckObjectStatus::Create);
129 let check_result = self
130 .repository
131 .metadata
132 .context
133 .check_and_fix_record(&command.entity, &mut command.values);
134 clear_record_status(&mut command.values);
135 check_result?;
136
137 Ok(command)
138 }
139
140 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
141 UpdateCommand::new(self.entity.clone(), id)
142 }
143
144 pub fn prepare_update_command(
145 &self,
146 command: &UpdateCommand,
147 ) -> Result<UpdateCommand, RuntimeError> {
148 let mut command = command.clone();
149 if let Some(behavior) = self.behavior() {
150 behavior.before_update(self.repository.metadata.context, &mut command)?;
151 }
152 self.enforce_update_policy(&mut command)?;
153
154 Ok(command)
155 }
156
157 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
158 DeleteCommand::new(self.entity.clone(), id)
159 }
160
161 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
162 RecoverCommand::new(self.entity.clone(), id, expected_version)
163 }
164
165 pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
166 let query = self
167 .prepare_select_query(query)
168 .map_err(RepositoryError::Runtime)?;
169 self.fetch_prepared_all(&query).await
170 }
171
172 pub async fn fetch_stream(
177 &self,
178 query: &SelectQuery,
179 ) -> Result<Vec<teaql_data_service::StreamChunk>, RepositoryError<E::Error>>
180 where
181 E: teaql_data_service::StreamQueryExecutor,
182 {
183 let query = self
184 .prepare_select_query(query)
185 .map_err(RepositoryError::Runtime)?;
186
187 let chunk_size = query.stream_config
188 .as_ref()
189 .map(|c| c.chunk_size)
190 .unwrap_or(1000);
191
192 let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
193 let mut query = query.clone();
194 query.comment = final_comment;
195
196 let request = teaql_data_service::QueryRequest {
197 query: query.clone(),
198 trace_chain: query.trace_chain.clone(),
199 comment: query.comment.clone(),
200 };
201
202 let chunks = self
203 .repository
204 .executor
205 .query_stream(request, chunk_size)
206 .await
207 .map_err(RepositoryError::Executor)?;
208
209 let mut enhanced_chunks = Vec::with_capacity(chunks.len());
211 for mut chunk in chunks {
212 self.enhance_object_group_bys(&mut chunk.rows, &query.object_group_bys, &query.trace_chain).await?;
213 self.enhance_child_queries(&mut chunk.rows, &query.child_enhancements, &query.trace_chain).await?;
214 self.enhance_query_relations(&mut chunk.rows, &query).await?;
215 enhanced_chunks.push(chunk);
216 }
217
218 Ok(enhanced_chunks)
219 }
220
221 async fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
222 let mut rows = self.fetch_prepared_query(query).await?;
223 self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain).await?;
224 self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain).await?;
225 self.enhance_query_relations(&mut rows, query).await?;
226 Ok(rows)
227 }
228
229 async fn fetch_prepared_query(
230 &self,
231 query: &SelectQuery,
232 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
233
234 let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
235 let mut query = query.clone();
236 query.comment = final_comment;
237 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
238 if let Some(cache) = self
239 .repository
240 .metadata
241 .context
242 .get_resource::<Arc<dyn AggregationCacheBackend>>()
243 {
244 return self.fetch_prepared_query_with_cache(
245 &query,
246 options,
247 cache.as_ref(),
248 ).await;
249 }
250 if let Some(cache) = self
251 .repository
252 .metadata
253 .context
254 .get_resource::<InMemoryAggregationCache>()
255 {
256 return self.fetch_prepared_query_with_cache(&query, options, cache).await;
257 }
258 }
259 let request = teaql_data_service::QueryRequest {
260 query: query.clone(),
261 trace_chain: query.trace_chain.clone(),
262 comment: query.comment.clone(),
263 };
264 let res = self
265 .repository
266 .executor
267 .query(request)
268 .await.map_err(RepositoryError::Executor)?;
269 self.repository.metadata.context.record_metadata_log(&res.metadata);
270 Ok(res.rows)
271 }
272
273 async fn fetch_prepared_query_with_cache(
274 &self,
275 query: &SelectQuery,
276 options: AggregationCacheOptions,
277 cache: &dyn AggregationCacheBackend,
278 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
279
280 let key = aggregation_cache_key(
281 cache.namespace(),
282 &aggregation_cache_namespace(&query.entity),
283 query,
284 );
285 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
286 return Ok(rows);
287 }
288 let request = teaql_data_service::QueryRequest {
289 query: query.clone(),
290 trace_chain: query.trace_chain.clone(),
291 comment: query.comment.clone(),
292 };
293 let res = self
294 .repository
295 .executor
296 .query(request)
297 .await.map_err(RepositoryError::Executor)?;
298 self.repository.metadata.context.record_metadata_log(&res.metadata);
299 let rows = res.rows;
300 cache.put(key, rows.clone());
301 Ok(rows)
302 }
303
304 pub async fn fetch_all_with_relation_aggregates(
305 &self,
306 query: &SelectQuery,
307 relation_aggregates: &[RelationAggregate],
308 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
309 let query = self
310 .prepare_select_query(query)
311 .map_err(RepositoryError::Runtime)?;
312
313 let mut rows = self.fetch_prepared_all(&query).await?;
314 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
315 Ok(rows)
316 }
317
318 pub async fn fetch_smart_list(
319 &self,
320 query: &SelectQuery,
321 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
322 let query = self
323 .prepare_select_query(query)
324 .map_err(RepositoryError::Runtime)?;
325
326 self.repository.fetch_smart_list(&query).await
327 }
328
329 pub async fn fetch_smart_list_with_relation_aggregates(
330 &self,
331 query: &SelectQuery,
332 relation_aggregates: &[RelationAggregate],
333 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
334 self.fetch_all_with_relation_aggregates(query, relation_aggregates).await
335 .map(SmartList::from)
336 }
337
338 pub async fn fetch_entities<T>(
339 &self,
340 query: &SelectQuery,
341 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
342 where
343 T: Entity,
344 {
345 let query = self
346 .prepare_select_query(query)
347 .map_err(RepositoryError::Runtime)?;
348
349 self.repository.fetch_entities(&query).await
350 }
351
352 pub async fn fetch_entities_with_relation_aggregates<T>(
353 &self,
354 query: &SelectQuery,
355 relation_aggregates: &[RelationAggregate],
356 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
357 where
358 T: Entity,
359 {
360 self.fetch_all_with_relation_aggregates(query, relation_aggregates).await?
361 .into_iter()
362 .map(|record| {
363 let mut entity = T::from_record(record)?;
364 let root = crate::EntityRoot::default();
365 entity.on_loaded(&root as &dyn std::any::Any);
366 Ok(entity)
367 })
368 .collect::<Result<Vec<_>, _>>()
369 .map(SmartList::from)
370 .map_err(RepositoryError::Entity)
371 }
372
373 pub async fn fetch_enhanced_entities_with_relation_aggregates<T>(
374 &self,
375 query: &SelectQuery,
376 relation_aggregates: &[RelationAggregate],
377 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
378 where
379 T: Entity,
380 {
381 let query = self
382 .prepare_select_query(query)
383 .map_err(RepositoryError::Runtime)?;
384
385 let mut rows = self.fetch_prepared_all(&query).await?;
386 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
387 self.enhance_relations(&mut rows).await?;
388 self.enhance_query_relations(&mut rows, &query).await?;
389 rows.into_iter()
390 .map(|record| {
391 let mut entity = T::from_record(record)?;
392 let root = crate::EntityRoot::default();
393 entity.on_loaded(&root as &dyn std::any::Any);
394 Ok(entity)
395 })
396 .collect::<Result<Vec<_>, _>>()
397 .map(SmartList::from)
398 .map_err(RepositoryError::Entity)
399 }
400
401 pub async fn fetch_enhanced_entities<T>(
402 &self,
403 query: &SelectQuery,
404 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
405 where
406 T: Entity,
407 {
408 let query = self
409 .prepare_select_query(query)
410 .map_err(RepositoryError::Runtime)?;
411
412 let mut rows = self.fetch_prepared_all(&query).await?;
413 self.enhance_relations(&mut rows).await?;
414 self.enhance_query_relations(&mut rows, &query).await?;
415 let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
416 rows.into_iter()
417 .map(|record| {
418 let mut entity = T::from_record(record)?;
419 if let Some(ref root) = root {
420 entity.on_loaded(root as &dyn std::any::Any);
421 }
422 Ok(entity)
423 })
424 .collect::<Result<Vec<_>, _>>()
425 .map(SmartList::from)
426 .map_err(RepositoryError::Entity)
427 }
428
429 pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
430 let command = self
431 .prepare_insert_command(command)
432 .map_err(RepositoryError::Runtime)?;
433 self.execute_prepared_insert_with_comment(command, self.trace_context.clone()).await
434 }
435
436 pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
437 let command = self
438 .prepare_update_command(command)
439 .map_err(RepositoryError::Runtime)?;
440 self.execute_prepared_update_with_comment(command, self.trace_context.clone()).await
441 }
442
443 pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
444 self.delete_scoped(command, self.trace_context.clone()).await
445 }
446
447 pub async fn delete_scoped(
448 &self,
449 command: &DeleteCommand,
450 trace_chain: Vec<teaql_core::TraceNode>,
451 ) -> Result<u64, RepositoryError<E::Error>> {
452 let mut command = command.clone();
453 command.trace_chain = trace_chain.clone();
454 if let Some(behavior) = self.behavior() {
455 behavior
456 .before_delete(self.repository.metadata.context, &mut command)
457 .map_err(RepositoryError::Runtime)?;
458 }
459 self.enforce_delete_policy(&mut command)
460 .map_err(RepositoryError::Runtime)?;
461
462 let old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
463 let affected = self.repository.delete(&command).await?;
464
465 let mut event = RawAuditEvent::deleted_with_old_values(
466 command.entity,
467 command.id,
468 command.expected_version,
469 old_values,
470 );
471 event.trace_chain = trace_chain;
472 self.emit_event(event)
473 .map_err(RepositoryError::Runtime)?;
474 Ok(affected)
475 }
476
477 pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
478 let mut command = command.clone();
479 command.trace_chain = self.trace_context.clone();
480 if let Some(behavior) = self.behavior() {
481 behavior
482 .before_recover(self.repository.metadata.context, &mut command)
483 .map_err(RepositoryError::Runtime)?;
484 }
485 self.enforce_recover_policy(&mut command)
486 .map_err(RepositoryError::Runtime)?;
487 let old_values = self.fetch_current_event_row(&command.entity, &command.id, command.trace_chain.clone())?;
488 let affected = self.repository.recover(&command).await?;
489 let event = RawAuditEvent::recovered_with_old_values(
490 command.entity,
491 command.id,
492 command.expected_version,
493 old_values,
494 );
495 self.emit_event(event)
496 .map_err(RepositoryError::Runtime)?;
497 Ok(affected)
498 }
499
500 fn emit_event(&self, event: RawAuditEvent) -> Result<(), RuntimeError> {
501 self.repository.metadata.context.send_event(event)
502 }
503
504 #[allow(dead_code)]
505 pub(super) async fn execute_prepared_insert(
506 &self,
507 command: InsertCommand,
508 ) -> Result<u64, RepositoryError<E::Error>> {
509 self.execute_prepared_insert_with_comment(command, Vec::new()).await
510 }
511
512 pub(super) async fn execute_prepared_insert_with_comment(
513 &self,
514 mut command: InsertCommand,
515 trace_chain: Vec<teaql_core::TraceNode>,
516 ) -> Result<u64, RepositoryError<E::Error>> {
517 command.trace_chain = trace_chain.clone();
518 let affected = self.repository.insert(&command).await?;
519 let mut event = RawAuditEvent::created(command.entity, command.values);
520 event.trace_chain = trace_chain;
521 self.emit_event(event).map_err(RepositoryError::Runtime)?;
522 Ok(affected)
523 }
524
525 pub(super) async fn execute_prepared_batch_insert(
526 &self,
527 command: teaql_core::BatchInsertCommand,
528 ) -> Result<u64, RepositoryError<E::Error>> {
529 if command.batch_values.is_empty() {
530 return Ok(0);
531 }
532 let affected = self.repository.batch_insert(&command).await?;
533
534 let entity = command.entity.clone();
535 for (i, values) in command.batch_values.into_iter().enumerate() {
536 let mut event = RawAuditEvent::created(entity.clone(), values);
537 if i < command.trace_chains.len() {
538 event.trace_chain = command.trace_chains[i].clone();
539 }
540 self.emit_event(event).map_err(RepositoryError::Runtime)?;
541 }
542 Ok(affected)
543 }
544
545 #[allow(dead_code)]
546 pub(super) async fn execute_prepared_update(
547 &self,
548 command: UpdateCommand,
549 ) -> Result<u64, RepositoryError<E::Error>> {
550 self.execute_prepared_update_with_comment(command, Vec::new()).await
551 }
552
553 pub(super) async fn execute_prepared_update_with_comment(
554 &self,
555 mut command: UpdateCommand,
556 trace_chain: Vec<teaql_core::TraceNode>,
557 ) -> Result<u64, RepositoryError<E::Error>> {
558 command.trace_chain = trace_chain.clone();
559
560 let mut old_values = command.old_values.clone();
561 let needs_fetch = match &old_values {
562 Some(snapshot) => !command.values.keys().all(|k| snapshot.contains_key(k)),
563 None => true,
564 };
565 if needs_fetch {
566 old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
567 }
568
569 let affected = self.repository.update(&command).await?;
570 let updated_fields = command.values.keys().cloned().collect();
571 let mut values = command.values.clone();
572 values.insert("id".to_owned(), command.id.clone());
573 if let Some(version) = command.expected_version {
574 values.insert("version".to_owned(), Value::I64(version + 1));
575 }
576 let mut new_values = old_values.clone().unwrap_or_default();
577 for (field, value) in &values {
578 new_values.insert(field.clone(), value.clone());
579 }
580 let mut event = RawAuditEvent::updated_with_old_values(
581 command.entity,
582 values,
583 old_values,
584 new_values,
585 updated_fields,
586 );
587 event.trace_chain = trace_chain;
588 self.emit_event(event).map_err(RepositoryError::Runtime)?;
589 Ok(affected)
590 }
591
592 pub(super) async fn execute_prepared_batch_update(
593 &self,
594 command: teaql_core::BatchUpdateCommand,
595 ) -> Result<u64, RepositoryError<E::Error>> {
596 if command.batch_values.is_empty() {
597 return Ok(0);
598 }
599 let affected = self.repository.batch_update(&command).await?;
600
601 let entity = command.entity.clone();
602 for (i, values) in command.batch_values.into_iter().enumerate() {
603 let mut full_values = values.clone();
604 full_values.insert("id".to_owned(), command.batch_ids[i].clone());
605 if let Some(Some(version)) = command.batch_expected_versions.get(i) {
606 full_values.insert("version".to_owned(), teaql_core::Value::I64(*version + 1));
607 }
608
609 let old_values = command.batch_old_values.get(i).cloned().unwrap_or(None);
610 let mut new_values = old_values.clone().unwrap_or_default();
611 for (field, value) in &full_values {
612 new_values.insert(field.clone(), value.clone());
613 }
614
615 let mut event = RawAuditEvent::updated_with_old_values(
616 entity.clone(),
617 full_values,
618 old_values,
619 new_values,
620 command.update_fields.clone(),
621 );
622 if i < command.trace_chains.len() {
623 event.trace_chain = command.trace_chains[i].clone();
624 }
625 self.emit_event(event).map_err(RepositoryError::Runtime)?;
626 }
627 Ok(affected)
628 }
629
630 fn fetch_current_event_row(
631 &self,
632 _entity: &str,
633 _id: &Value,
634 _trace_chain: Vec<teaql_core::TraceNode>,
635 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
636 Ok(None)
639 }
640
641
642 pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, E> {
643 ResolvedRepository {
644 entity,
645 repository: ContextRepository {
646 metadata: UserContextMetadata {
647 context: self.repository.metadata.context,
648 },
649 executor: self.repository.executor,
650 },
651 trace_context: Vec::new(),
652 }
653 }
654}