1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5 AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11 CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12 SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17 ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22 D: SqlDialect,
23 E: QueryExecutor,
24{
25 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26 self.repository.metadata.context.repository_behavior(entity)
27 }
28
29 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30 self.repository
31 .metadata
32 .context
33 .repository_behavior(&self.entity)
34 }
35
36 pub fn entity(&self) -> &str {
37 &self.entity
38 }
39
40 pub fn select(&self) -> SelectQuery {
41 SelectQuery::new(self.entity.clone())
42 }
43
44 pub fn insert_command(&self) -> InsertCommand {
45 InsertCommand::new(self.entity.clone())
46 }
47
48 fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50 policy.enforce_insert(self.repository.metadata.context, command)?;
51 }
52 Ok(())
53 }
54
55 fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57 policy.enforce_update(self.repository.metadata.context, command)?;
58 }
59 Ok(())
60 }
61
62 fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64 policy.enforce_delete(self.repository.metadata.context, command)?;
65 }
66 Ok(())
67 }
68
69 fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71 policy.enforce_recover(self.repository.metadata.context, command)?;
72 }
73 Ok(())
74 }
75
76 fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77 let mut query = query.clone();
78 if let Some(behavior) = self.query_behavior(&query.entity) {
79 behavior.before_select(self.repository.metadata.context, &mut query)?;
80 }
81 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
82 policy.enforce_select(self.repository.metadata.context, &mut query)?;
83 }
84 Ok(query)
85 }
86
87 pub fn prepare_insert_command(
88 &self,
89 command: &InsertCommand,
90 ) -> Result<InsertCommand, RuntimeError> {
91 let mut command = command.clone();
92 if let Some(behavior) = self.behavior() {
93 behavior.before_insert(self.repository.metadata.context, &mut command)?;
94 }
95 self.enforce_insert_policy(&mut command)?;
96
97 let entity = self
98 .repository
99 .metadata
100 .context
101 .require_entity(&command.entity)?;
102 if let Some(id_property) = entity.id_property() {
103 let needs_id = !command.values.contains_key(&id_property.name)
104 || is_unassigned_id(command.values.get(&id_property.name));
105 if needs_id {
106 let id = self.repository.metadata.context.next_id(&command.entity)?;
107 command
108 .values
109 .insert(id_property.name.clone(), Value::U64(id));
110 }
111 }
112 mark_record_status(&mut command.values, CheckObjectStatus::Create);
113 let check_result = self
114 .repository
115 .metadata
116 .context
117 .check_and_fix_record(&command.entity, &mut command.values);
118 clear_record_status(&mut command.values);
119 check_result?;
120
121 Ok(command)
122 }
123
124 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
125 UpdateCommand::new(self.entity.clone(), id)
126 }
127
128 pub fn prepare_update_command(
129 &self,
130 command: &UpdateCommand,
131 ) -> Result<UpdateCommand, RuntimeError> {
132 let mut command = command.clone();
133 if let Some(behavior) = self.behavior() {
134 behavior.before_update(self.repository.metadata.context, &mut command)?;
135 }
136 self.enforce_update_policy(&mut command)?;
137 mark_record_status(&mut command.values, CheckObjectStatus::Update);
138 let check_result = self
139 .repository
140 .metadata
141 .context
142 .check_and_fix_record(&command.entity, &mut command.values);
143 clear_record_status(&mut command.values);
144 check_result?;
145 Ok(command)
146 }
147
148 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
149 DeleteCommand::new(self.entity.clone(), id)
150 }
151
152 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
153 RecoverCommand::new(self.entity.clone(), id, expected_version)
154 }
155
156 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
157 let query = self.prepare_select_query(query)?;
158 self.repository.compile(&query)
159 }
160
161 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
162 let query = self
163 .prepare_select_query(query)
164 .map_err(RepositoryError::Runtime)?;
165 let mut rows = self.fetch_prepared_query(&query)?;
166 self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
167 self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
168 Ok(rows)
169 }
170
171 fn fetch_prepared_query(
172 &self,
173 query: &SelectQuery,
174 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
175 let compiled = self
176 .repository
177 .compile(query)
178 .map_err(RepositoryError::Runtime)?;
179 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
180 if let Some(cache) = self
181 .repository
182 .metadata
183 .context
184 .get_resource::<Arc<dyn AggregationCacheBackend>>()
185 {
186 return self.fetch_prepared_query_with_cache(
187 query,
188 &compiled,
189 options,
190 cache.as_ref(),
191 );
192 }
193 if let Some(cache) = self
194 .repository
195 .metadata
196 .context
197 .get_resource::<InMemoryAggregationCache>()
198 {
199 return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
200 }
201 }
202 let started_at = SystemTime::now();
203 let started = Instant::now();
204 let rows = self
205 .repository
206 .executor
207 .fetch_all(&compiled)
208 .map_err(RepositoryError::Executor)?;
209 self.repository.log_sql_result(
210 SqlLogOperation::Select,
211 &compiled,
212 started_at,
213 started,
214 Some(rows.len()),
215 Some(query.entity.clone()),
216 None,
217 );
218 Ok(rows)
219 }
220
221 fn fetch_prepared_query_with_cache(
222 &self,
223 query: &SelectQuery,
224 compiled: &CompiledQuery,
225 options: AggregationCacheOptions,
226 cache: &dyn AggregationCacheBackend,
227 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
228 let key = aggregation_cache_key(
229 cache.namespace(),
230 &aggregation_cache_namespace(&query.entity),
231 compiled,
232 );
233 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
234 return Ok(rows);
235 }
236 let started_at = SystemTime::now();
237 let started = Instant::now();
238 let rows = self
239 .repository
240 .executor
241 .fetch_all(compiled)
242 .map_err(RepositoryError::Executor)?;
243 self.repository.log_sql_result(
244 SqlLogOperation::Select,
245 compiled,
246 started_at,
247 started,
248 Some(rows.len()),
249 Some(query.entity.clone()),
250 None,
251 );
252 cache.put(key, rows.clone());
253 Ok(rows)
254 }
255
256 pub fn fetch_all_with_relation_aggregates(
257 &self,
258 query: &SelectQuery,
259 relation_aggregates: &[RelationAggregate],
260 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
261 let mut rows = self.fetch_all(query)?;
262 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
263 Ok(rows)
264 }
265
266 pub fn fetch_smart_list(
267 &self,
268 query: &SelectQuery,
269 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
270 let query = self
271 .prepare_select_query(query)
272 .map_err(RepositoryError::Runtime)?;
273 self.repository.fetch_smart_list(&query)
274 }
275
276 pub fn fetch_smart_list_with_relation_aggregates(
277 &self,
278 query: &SelectQuery,
279 relation_aggregates: &[RelationAggregate],
280 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
281 self.fetch_all_with_relation_aggregates(query, relation_aggregates)
282 .map(SmartList::from)
283 }
284
285 pub fn fetch_entities<T>(
286 &self,
287 query: &SelectQuery,
288 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
289 where
290 T: Entity,
291 {
292 let query = self
293 .prepare_select_query(query)
294 .map_err(RepositoryError::Runtime)?;
295 self.repository.fetch_entities(&query)
296 }
297
298 pub fn fetch_entities_with_relation_aggregates<T>(
299 &self,
300 query: &SelectQuery,
301 relation_aggregates: &[RelationAggregate],
302 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
303 where
304 T: Entity,
305 {
306 self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
307 .into_iter()
308 .map(T::from_record)
309 .collect::<Result<Vec<_>, _>>()
310 .map(SmartList::from)
311 .map_err(RepositoryError::Entity)
312 }
313
314 pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
315 &self,
316 query: &SelectQuery,
317 relation_aggregates: &[RelationAggregate],
318 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
319 where
320 T: Entity,
321 {
322 let query = self
323 .prepare_select_query(query)
324 .map_err(RepositoryError::Runtime)?;
325
326 let mut rows = self.repository.fetch_all(&query)?;
327 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
328 self.enhance_query_relations(&mut rows, &query)?;
329 self.enhance_relations(&mut rows)?;
330 rows.into_iter()
331 .map(T::from_record)
332 .collect::<Result<Vec<_>, _>>()
333 .map(SmartList::from)
334 .map_err(RepositoryError::Entity)
335 }
336
337 pub fn fetch_enhanced_entities<T>(
338 &self,
339 query: &SelectQuery,
340 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
341 where
342 T: Entity,
343 {
344 let query = self
345 .prepare_select_query(query)
346 .map_err(RepositoryError::Runtime)?;
347
348 let mut rows = self.repository.fetch_all(&query)?;
349 self.enhance_query_relations(&mut rows, &query)?;
350 self.enhance_relations(&mut rows)?;
351 rows.into_iter()
352 .map(T::from_record)
353 .collect::<Result<Vec<_>, _>>()
354 .map(SmartList::from)
355 .map_err(RepositoryError::Entity)
356 }
357
358 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
359 let command = self
360 .prepare_insert_command(command)
361 .map_err(RepositoryError::Runtime)?;
362 self.execute_prepared_insert(command)
363 }
364
365 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
366 let command = self
367 .prepare_update_command(command)
368 .map_err(RepositoryError::Runtime)?;
369 self.execute_prepared_update(command)
370 }
371
372 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
373 let mut command = command.clone();
374 if let Some(behavior) = self.behavior() {
375 behavior
376 .before_delete(self.repository.metadata.context, &mut command)
377 .map_err(RepositoryError::Runtime)?;
378 }
379 self.enforce_delete_policy(&mut command)
380 .map_err(RepositoryError::Runtime)?;
381 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
382 let affected = self.repository.delete(&command)?;
383 self.emit_event(EntityEvent::deleted_with_old_values(
384 command.entity,
385 command.id,
386 command.expected_version,
387 old_values,
388 ))
389 .map_err(RepositoryError::Runtime)?;
390 Ok(affected)
391 }
392
393 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
394 let mut command = command.clone();
395 if let Some(behavior) = self.behavior() {
396 behavior
397 .before_recover(self.repository.metadata.context, &mut command)
398 .map_err(RepositoryError::Runtime)?;
399 }
400 self.enforce_recover_policy(&mut command)
401 .map_err(RepositoryError::Runtime)?;
402 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
403 let affected = self.repository.recover(&command)?;
404 self.emit_event(EntityEvent::recovered_with_old_values(
405 command.entity,
406 command.id,
407 command.expected_version,
408 old_values,
409 ))
410 .map_err(RepositoryError::Runtime)?;
411 Ok(affected)
412 }
413
414 fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
415 self.repository.metadata.context.send_event(event)
416 }
417
418 pub(super) fn execute_prepared_insert(
419 &self,
420 command: InsertCommand,
421 ) -> Result<u64, RepositoryError<E::Error>> {
422 let affected = self.repository.insert(&command)?;
423 self.emit_event(EntityEvent::created(command.entity, command.values))
424 .map_err(RepositoryError::Runtime)?;
425 Ok(affected)
426 }
427
428 pub(super) fn execute_prepared_update(
429 &self,
430 command: UpdateCommand,
431 ) -> Result<u64, RepositoryError<E::Error>> {
432 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
433 let affected = self.repository.update(&command)?;
434 let updated_fields = command.values.keys().cloned().collect();
435 let mut values = command.values.clone();
436 values.insert("id".to_owned(), command.id.clone());
437 if let Some(version) = command.expected_version {
438 values.insert("version".to_owned(), Value::I64(version + 1));
439 }
440 let mut new_values = old_values.clone().unwrap_or_default();
441 for (field, value) in &values {
442 new_values.insert(field.clone(), value.clone());
443 }
444 self.emit_event(EntityEvent::updated_with_old_values(
445 command.entity,
446 values,
447 old_values,
448 new_values,
449 updated_fields,
450 ))
451 .map_err(RepositoryError::Runtime)?;
452 Ok(affected)
453 }
454
455 fn fetch_current_event_row(
456 &self,
457 entity: &str,
458 id: &Value,
459 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
460 if self.repository.metadata.context.event_sink.is_none() {
461 return Ok(None);
462 }
463 let descriptor = self
464 .repository
465 .metadata
466 .context
467 .require_entity(entity)
468 .map_err(RepositoryError::Runtime)?;
469 let Some(id_property) = descriptor.id_property() else {
470 return Ok(None);
471 };
472 let mut rows = self.fetch_all(
473 &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
474 )?;
475 Ok(rows.pop())
476 }
477
478 pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
479 ResolvedRepository {
480 entity,
481 repository: ContextRepository {
482 metadata: UserContextMetadata {
483 context: self.repository.metadata.context,
484 },
485 dialect: self.repository.dialect,
486 executor: self.repository.executor,
487 },
488 }
489 }
490}
491
492fn is_unassigned_id(value: Option<&Value>) -> bool {
493 matches!(
494 value,
495 None | Some(Value::Null) | Some(Value::U64(0)) | Some(Value::I64(0))
496 )
497}