1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5 AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11 CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12 SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17 ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22 D: SqlDialect,
23 E: QueryExecutor,
24{
25 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26 self.repository.metadata.context.repository_behavior(entity)
27 }
28
29 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30 self.repository
31 .metadata
32 .context
33 .repository_behavior(&self.entity)
34 }
35
36 pub fn entity(&self) -> &str {
37 &self.entity
38 }
39
40 pub fn select(&self) -> SelectQuery {
41 SelectQuery::new(self.entity.clone())
42 }
43
44 pub fn insert_command(&self) -> InsertCommand {
45 InsertCommand::new(self.entity.clone())
46 }
47
48 fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50 policy.enforce_insert(self.repository.metadata.context, command)?;
51 }
52 Ok(())
53 }
54
55 fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57 policy.enforce_update(self.repository.metadata.context, command)?;
58 }
59 Ok(())
60 }
61
62 fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64 policy.enforce_delete(self.repository.metadata.context, command)?;
65 }
66 Ok(())
67 }
68
69 fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71 policy.enforce_recover(self.repository.metadata.context, command)?;
72 }
73 Ok(())
74 }
75
76 fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77 let mut query = query.clone();
78 if let Some(behavior) = self.query_behavior(&query.entity) {
79 behavior.before_select(self.repository.metadata.context, &mut query)?;
80 }
81 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
82 policy.enforce_select(self.repository.metadata.context, &mut query)?;
83 }
84 Ok(query)
85 }
86
87 pub fn prepare_insert_command(
88 &self,
89 command: &InsertCommand,
90 ) -> Result<InsertCommand, RuntimeError> {
91 let mut command = command.clone();
92 if let Some(behavior) = self.behavior() {
93 behavior.before_insert(self.repository.metadata.context, &mut command)?;
94 }
95 self.enforce_insert_policy(&mut command)?;
96
97 let entity = self
98 .repository
99 .metadata
100 .context
101 .require_entity(&command.entity)?;
102 if let Some(id_property) = entity.id_property() {
103 let needs_id = !command.values.contains_key(&id_property.name)
104 || is_unassigned_id(command.values.get(&id_property.name));
105 if needs_id {
106 let id = self.repository.metadata.context.next_id(&command.entity)?;
107 command
108 .values
109 .insert(id_property.name.clone(), Value::U64(id));
110 }
111 }
112 ensure_initial_version(&mut command.values, entity);
113 mark_record_status(&mut command.values, CheckObjectStatus::Create);
114 let check_result = self
115 .repository
116 .metadata
117 .context
118 .check_and_fix_record(&command.entity, &mut command.values);
119 clear_record_status(&mut command.values);
120 check_result?;
121
122 Ok(command)
123 }
124
125 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
126 UpdateCommand::new(self.entity.clone(), id)
127 }
128
129 pub fn prepare_update_command(
130 &self,
131 command: &UpdateCommand,
132 ) -> Result<UpdateCommand, RuntimeError> {
133 let mut command = command.clone();
134 if let Some(behavior) = self.behavior() {
135 behavior.before_update(self.repository.metadata.context, &mut command)?;
136 }
137 self.enforce_update_policy(&mut command)?;
138 mark_record_status(&mut command.values, CheckObjectStatus::Update);
139 let check_result = self
140 .repository
141 .metadata
142 .context
143 .check_and_fix_record(&command.entity, &mut command.values);
144 clear_record_status(&mut command.values);
145 check_result?;
146 Ok(command)
147 }
148
149 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
150 DeleteCommand::new(self.entity.clone(), id)
151 }
152
153 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
154 RecoverCommand::new(self.entity.clone(), id, expected_version)
155 }
156
157 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
158 let query = self.prepare_select_query(query)?;
159 self.repository.compile(&query)
160 }
161
162 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
163 let query = self
164 .prepare_select_query(query)
165 .map_err(RepositoryError::Runtime)?;
166 let mut rows = self.fetch_prepared_query(&query)?;
167 self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
168 self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
169 Ok(rows)
170 }
171
172 fn fetch_prepared_query(
173 &self,
174 query: &SelectQuery,
175 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
176 let compiled = self
177 .repository
178 .compile(query)
179 .map_err(RepositoryError::Runtime)?;
180 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
181 if let Some(cache) = self
182 .repository
183 .metadata
184 .context
185 .get_resource::<Arc<dyn AggregationCacheBackend>>()
186 {
187 return self.fetch_prepared_query_with_cache(
188 query,
189 &compiled,
190 options,
191 cache.as_ref(),
192 );
193 }
194 if let Some(cache) = self
195 .repository
196 .metadata
197 .context
198 .get_resource::<InMemoryAggregationCache>()
199 {
200 return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
201 }
202 }
203 let started_at = SystemTime::now();
204 let started = Instant::now();
205 let rows = self
206 .repository
207 .executor
208 .fetch_all(&compiled)
209 .map_err(RepositoryError::Executor)?;
210 self.repository.log_sql_result(
211 SqlLogOperation::Select,
212 &compiled,
213 started_at,
214 started,
215 Some(rows.len()),
216 Some(query.entity.clone()),
217 None,
218 );
219 Ok(rows)
220 }
221
222 fn fetch_prepared_query_with_cache(
223 &self,
224 query: &SelectQuery,
225 compiled: &CompiledQuery,
226 options: AggregationCacheOptions,
227 cache: &dyn AggregationCacheBackend,
228 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
229 let key = aggregation_cache_key(
230 cache.namespace(),
231 &aggregation_cache_namespace(&query.entity),
232 compiled,
233 );
234 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
235 return Ok(rows);
236 }
237 let started_at = SystemTime::now();
238 let started = Instant::now();
239 let rows = self
240 .repository
241 .executor
242 .fetch_all(compiled)
243 .map_err(RepositoryError::Executor)?;
244 self.repository.log_sql_result(
245 SqlLogOperation::Select,
246 compiled,
247 started_at,
248 started,
249 Some(rows.len()),
250 Some(query.entity.clone()),
251 None,
252 );
253 cache.put(key, rows.clone());
254 Ok(rows)
255 }
256
257 pub fn fetch_all_with_relation_aggregates(
258 &self,
259 query: &SelectQuery,
260 relation_aggregates: &[RelationAggregate],
261 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
262 let mut rows = self.fetch_all(query)?;
263 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
264 Ok(rows)
265 }
266
267 pub fn fetch_smart_list(
268 &self,
269 query: &SelectQuery,
270 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
271 let query = self
272 .prepare_select_query(query)
273 .map_err(RepositoryError::Runtime)?;
274 self.repository.fetch_smart_list(&query)
275 }
276
277 pub fn fetch_smart_list_with_relation_aggregates(
278 &self,
279 query: &SelectQuery,
280 relation_aggregates: &[RelationAggregate],
281 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
282 self.fetch_all_with_relation_aggregates(query, relation_aggregates)
283 .map(SmartList::from)
284 }
285
286 pub fn fetch_entities<T>(
287 &self,
288 query: &SelectQuery,
289 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
290 where
291 T: Entity,
292 {
293 let query = self
294 .prepare_select_query(query)
295 .map_err(RepositoryError::Runtime)?;
296 self.repository.fetch_entities(&query)
297 }
298
299 pub fn fetch_entities_with_relation_aggregates<T>(
300 &self,
301 query: &SelectQuery,
302 relation_aggregates: &[RelationAggregate],
303 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
304 where
305 T: Entity,
306 {
307 self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
308 .into_iter()
309 .map(T::from_record)
310 .collect::<Result<Vec<_>, _>>()
311 .map(SmartList::from)
312 .map_err(RepositoryError::Entity)
313 }
314
315 pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
316 &self,
317 query: &SelectQuery,
318 relation_aggregates: &[RelationAggregate],
319 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
320 where
321 T: Entity,
322 {
323 let query = self
324 .prepare_select_query(query)
325 .map_err(RepositoryError::Runtime)?;
326
327 let mut rows = self.repository.fetch_all(&query)?;
328 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
329 self.enhance_query_relations(&mut rows, &query)?;
330 self.enhance_relations(&mut rows)?;
331 rows.into_iter()
332 .map(T::from_record)
333 .collect::<Result<Vec<_>, _>>()
334 .map(SmartList::from)
335 .map_err(RepositoryError::Entity)
336 }
337
338 pub fn fetch_enhanced_entities<T>(
339 &self,
340 query: &SelectQuery,
341 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
342 where
343 T: Entity,
344 {
345 let query = self
346 .prepare_select_query(query)
347 .map_err(RepositoryError::Runtime)?;
348
349 let mut rows = self.repository.fetch_all(&query)?;
350 self.enhance_query_relations(&mut rows, &query)?;
351 self.enhance_relations(&mut rows)?;
352 rows.into_iter()
353 .map(T::from_record)
354 .collect::<Result<Vec<_>, _>>()
355 .map(SmartList::from)
356 .map_err(RepositoryError::Entity)
357 }
358
359 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
360 let command = self
361 .prepare_insert_command(command)
362 .map_err(RepositoryError::Runtime)?;
363 self.execute_prepared_insert(command)
364 }
365
366 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
367 let command = self
368 .prepare_update_command(command)
369 .map_err(RepositoryError::Runtime)?;
370 self.execute_prepared_update(command)
371 }
372
373 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
374 let mut command = command.clone();
375 if let Some(behavior) = self.behavior() {
376 behavior
377 .before_delete(self.repository.metadata.context, &mut command)
378 .map_err(RepositoryError::Runtime)?;
379 }
380 self.enforce_delete_policy(&mut command)
381 .map_err(RepositoryError::Runtime)?;
382 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
383 let affected = self.repository.delete(&command)?;
384 self.emit_event(EntityEvent::deleted_with_old_values(
385 command.entity,
386 command.id,
387 command.expected_version,
388 old_values,
389 ))
390 .map_err(RepositoryError::Runtime)?;
391 Ok(affected)
392 }
393
394 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
395 let mut command = command.clone();
396 if let Some(behavior) = self.behavior() {
397 behavior
398 .before_recover(self.repository.metadata.context, &mut command)
399 .map_err(RepositoryError::Runtime)?;
400 }
401 self.enforce_recover_policy(&mut command)
402 .map_err(RepositoryError::Runtime)?;
403 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
404 let affected = self.repository.recover(&command)?;
405 self.emit_event(EntityEvent::recovered_with_old_values(
406 command.entity,
407 command.id,
408 command.expected_version,
409 old_values,
410 ))
411 .map_err(RepositoryError::Runtime)?;
412 Ok(affected)
413 }
414
415 fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
416 self.repository.metadata.context.send_event(event)
417 }
418
419 pub(super) fn execute_prepared_insert(
420 &self,
421 command: InsertCommand,
422 ) -> Result<u64, RepositoryError<E::Error>> {
423 let affected = self.repository.insert(&command)?;
424 self.emit_event(EntityEvent::created(command.entity, command.values))
425 .map_err(RepositoryError::Runtime)?;
426 Ok(affected)
427 }
428
429 pub(super) fn execute_prepared_update(
430 &self,
431 command: UpdateCommand,
432 ) -> Result<u64, RepositoryError<E::Error>> {
433 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
434 let affected = self.repository.update(&command)?;
435 let updated_fields = command.values.keys().cloned().collect();
436 let mut values = command.values.clone();
437 values.insert("id".to_owned(), command.id.clone());
438 if let Some(version) = command.expected_version {
439 values.insert("version".to_owned(), Value::I64(version + 1));
440 }
441 let mut new_values = old_values.clone().unwrap_or_default();
442 for (field, value) in &values {
443 new_values.insert(field.clone(), value.clone());
444 }
445 self.emit_event(EntityEvent::updated_with_old_values(
446 command.entity,
447 values,
448 old_values,
449 new_values,
450 updated_fields,
451 ))
452 .map_err(RepositoryError::Runtime)?;
453 Ok(affected)
454 }
455
456 fn fetch_current_event_row(
457 &self,
458 entity: &str,
459 id: &Value,
460 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
461 if self.repository.metadata.context.event_sink.is_none() {
462 return Ok(None);
463 }
464 let descriptor = self
465 .repository
466 .metadata
467 .context
468 .require_entity(entity)
469 .map_err(RepositoryError::Runtime)?;
470 let Some(id_property) = descriptor.id_property() else {
471 return Ok(None);
472 };
473 let mut rows = self.fetch_all(
474 &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
475 )?;
476 Ok(rows.pop())
477 }
478
479 pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
480 ResolvedRepository {
481 entity,
482 repository: ContextRepository {
483 metadata: UserContextMetadata {
484 context: self.repository.metadata.context,
485 },
486 dialect: self.repository.dialect,
487 executor: self.repository.executor,
488 },
489 }
490 }
491}