1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5 AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11 CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12 SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17 ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22 D: SqlDialect,
23 E: QueryExecutor,
24{
25 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26 self.repository.metadata.context.repository_behavior(entity)
27 }
28
29 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30 self.repository
31 .metadata
32 .context
33 .repository_behavior(&self.entity)
34 }
35
36 pub fn entity(&self) -> &str {
37 &self.entity
38 }
39
40 pub fn select(&self) -> SelectQuery {
41 SelectQuery::new(self.entity.clone())
42 }
43
44 pub fn insert_command(&self) -> InsertCommand {
45 InsertCommand::new(self.entity.clone())
46 }
47
48 pub fn prepare_insert_command(
49 &self,
50 command: &InsertCommand,
51 ) -> Result<InsertCommand, RuntimeError> {
52 let mut command = command.clone();
53 if let Some(behavior) = self.behavior() {
54 behavior.before_insert(self.repository.metadata.context, &mut command)?;
55 }
56
57 let entity = self
58 .repository
59 .metadata
60 .context
61 .require_entity(&command.entity)?;
62 if let Some(id_property) = entity.id_property() {
63 let needs_id = !command.values.contains_key(&id_property.name)
64 || matches!(command.values.get(&id_property.name), Some(Value::Null));
65 if needs_id {
66 let id = self.repository.metadata.context.next_id(&command.entity)?;
67 command
68 .values
69 .insert(id_property.name.clone(), Value::U64(id));
70 }
71 }
72 mark_record_status(&mut command.values, CheckObjectStatus::Create);
73 let check_result = self
74 .repository
75 .metadata
76 .context
77 .check_and_fix_record(&command.entity, &mut command.values);
78 clear_record_status(&mut command.values);
79 check_result?;
80
81 Ok(command)
82 }
83
84 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
85 UpdateCommand::new(self.entity.clone(), id)
86 }
87
88 pub fn prepare_update_command(
89 &self,
90 command: &UpdateCommand,
91 ) -> Result<UpdateCommand, RuntimeError> {
92 let mut command = command.clone();
93 if let Some(behavior) = self.behavior() {
94 behavior.before_update(self.repository.metadata.context, &mut command)?;
95 }
96 mark_record_status(&mut command.values, CheckObjectStatus::Update);
97 let check_result = self
98 .repository
99 .metadata
100 .context
101 .check_and_fix_record(&command.entity, &mut command.values);
102 clear_record_status(&mut command.values);
103 check_result?;
104 Ok(command)
105 }
106
107 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
108 DeleteCommand::new(self.entity.clone(), id)
109 }
110
111 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
112 RecoverCommand::new(self.entity.clone(), id, expected_version)
113 }
114
115 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
116 let mut query = query.clone();
117 if let Some(behavior) = self.query_behavior(&query.entity) {
118 behavior.before_select(self.repository.metadata.context, &mut query)?;
119 }
120 self.repository.compile(&query)
121 }
122
123 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
124 let mut query = query.clone();
125 if let Some(behavior) = self.query_behavior(&query.entity) {
126 behavior
127 .before_select(self.repository.metadata.context, &mut query)
128 .map_err(RepositoryError::Runtime)?;
129 }
130 let mut rows = self.fetch_prepared_query(&query)?;
131 self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
132 self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
133 Ok(rows)
134 }
135
136 fn fetch_prepared_query(
137 &self,
138 query: &SelectQuery,
139 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
140 let compiled = self
141 .repository
142 .compile(query)
143 .map_err(RepositoryError::Runtime)?;
144 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
145 if let Some(cache) = self
146 .repository
147 .metadata
148 .context
149 .get_resource::<Arc<dyn AggregationCacheBackend>>()
150 {
151 return self.fetch_prepared_query_with_cache(
152 query,
153 &compiled,
154 options,
155 cache.as_ref(),
156 );
157 }
158 if let Some(cache) = self
159 .repository
160 .metadata
161 .context
162 .get_resource::<InMemoryAggregationCache>()
163 {
164 return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
165 }
166 }
167 let started_at = SystemTime::now();
168 let started = Instant::now();
169 let rows = self
170 .repository
171 .executor
172 .fetch_all(&compiled)
173 .map_err(RepositoryError::Executor)?;
174 self.repository.log_sql_result(
175 SqlLogOperation::Select,
176 &compiled,
177 started_at,
178 started,
179 Some(rows.len()),
180 Some(query.entity.clone()),
181 None,
182 );
183 Ok(rows)
184 }
185
186 fn fetch_prepared_query_with_cache(
187 &self,
188 query: &SelectQuery,
189 compiled: &CompiledQuery,
190 options: AggregationCacheOptions,
191 cache: &dyn AggregationCacheBackend,
192 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
193 let key = aggregation_cache_key(
194 cache.namespace(),
195 &aggregation_cache_namespace(&query.entity),
196 compiled,
197 );
198 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
199 return Ok(rows);
200 }
201 let started_at = SystemTime::now();
202 let started = Instant::now();
203 let rows = self
204 .repository
205 .executor
206 .fetch_all(compiled)
207 .map_err(RepositoryError::Executor)?;
208 self.repository.log_sql_result(
209 SqlLogOperation::Select,
210 compiled,
211 started_at,
212 started,
213 Some(rows.len()),
214 Some(query.entity.clone()),
215 None,
216 );
217 cache.put(key, rows.clone());
218 Ok(rows)
219 }
220
221 pub fn fetch_all_with_relation_aggregates(
222 &self,
223 query: &SelectQuery,
224 relation_aggregates: &[RelationAggregate],
225 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
226 let mut rows = self.fetch_all(query)?;
227 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
228 Ok(rows)
229 }
230
231 pub fn fetch_smart_list(
232 &self,
233 query: &SelectQuery,
234 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
235 let mut query = query.clone();
236 if let Some(behavior) = self.query_behavior(&query.entity) {
237 behavior
238 .before_select(self.repository.metadata.context, &mut query)
239 .map_err(RepositoryError::Runtime)?;
240 }
241 self.repository.fetch_smart_list(&query)
242 }
243
244 pub fn fetch_smart_list_with_relation_aggregates(
245 &self,
246 query: &SelectQuery,
247 relation_aggregates: &[RelationAggregate],
248 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
249 self.fetch_all_with_relation_aggregates(query, relation_aggregates)
250 .map(SmartList::from)
251 }
252
253 pub fn fetch_entities<T>(
254 &self,
255 query: &SelectQuery,
256 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
257 where
258 T: Entity,
259 {
260 let mut query = query.clone();
261 if let Some(behavior) = self.query_behavior(&query.entity) {
262 behavior
263 .before_select(self.repository.metadata.context, &mut query)
264 .map_err(RepositoryError::Runtime)?;
265 }
266 self.repository.fetch_entities(&query)
267 }
268
269 pub fn fetch_entities_with_relation_aggregates<T>(
270 &self,
271 query: &SelectQuery,
272 relation_aggregates: &[RelationAggregate],
273 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
274 where
275 T: Entity,
276 {
277 self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
278 .into_iter()
279 .map(T::from_record)
280 .collect::<Result<Vec<_>, _>>()
281 .map(SmartList::from)
282 .map_err(RepositoryError::Entity)
283 }
284
285 pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
286 &self,
287 query: &SelectQuery,
288 relation_aggregates: &[RelationAggregate],
289 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
290 where
291 T: Entity,
292 {
293 let mut query = query.clone();
294 if let Some(behavior) = self.query_behavior(&query.entity) {
295 behavior
296 .before_select(self.repository.metadata.context, &mut query)
297 .map_err(RepositoryError::Runtime)?;
298 }
299
300 let mut rows = self.repository.fetch_all(&query)?;
301 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
302 self.enhance_query_relations(&mut rows, &query)?;
303 self.enhance_relations(&mut rows)?;
304 rows.into_iter()
305 .map(T::from_record)
306 .collect::<Result<Vec<_>, _>>()
307 .map(SmartList::from)
308 .map_err(RepositoryError::Entity)
309 }
310
311 pub fn fetch_enhanced_entities<T>(
312 &self,
313 query: &SelectQuery,
314 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
315 where
316 T: Entity,
317 {
318 let mut query = query.clone();
319 if let Some(behavior) = self.query_behavior(&query.entity) {
320 behavior
321 .before_select(self.repository.metadata.context, &mut query)
322 .map_err(RepositoryError::Runtime)?;
323 }
324
325 let mut rows = self.repository.fetch_all(&query)?;
326 self.enhance_query_relations(&mut rows, &query)?;
327 self.enhance_relations(&mut rows)?;
328 rows.into_iter()
329 .map(T::from_record)
330 .collect::<Result<Vec<_>, _>>()
331 .map(SmartList::from)
332 .map_err(RepositoryError::Entity)
333 }
334
335 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
336 let command = self
337 .prepare_insert_command(command)
338 .map_err(RepositoryError::Runtime)?;
339 self.execute_prepared_insert(command)
340 }
341
342 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
343 let command = self
344 .prepare_update_command(command)
345 .map_err(RepositoryError::Runtime)?;
346 self.execute_prepared_update(command)
347 }
348
349 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
350 let mut command = command.clone();
351 if let Some(behavior) = self.behavior() {
352 behavior
353 .before_delete(self.repository.metadata.context, &mut command)
354 .map_err(RepositoryError::Runtime)?;
355 }
356 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
357 let affected = self.repository.delete(&command)?;
358 self.emit_event(EntityEvent::deleted_with_old_values(
359 command.entity,
360 command.id,
361 command.expected_version,
362 old_values,
363 ))
364 .map_err(RepositoryError::Runtime)?;
365 Ok(affected)
366 }
367
368 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
369 let mut command = command.clone();
370 if let Some(behavior) = self.behavior() {
371 behavior
372 .before_recover(self.repository.metadata.context, &mut command)
373 .map_err(RepositoryError::Runtime)?;
374 }
375 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
376 let affected = self.repository.recover(&command)?;
377 self.emit_event(EntityEvent::recovered_with_old_values(
378 command.entity,
379 command.id,
380 command.expected_version,
381 old_values,
382 ))
383 .map_err(RepositoryError::Runtime)?;
384 Ok(affected)
385 }
386
387 fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
388 self.repository.metadata.context.send_event(event)
389 }
390
391 pub(super) fn execute_prepared_insert(
392 &self,
393 command: InsertCommand,
394 ) -> Result<u64, RepositoryError<E::Error>> {
395 let affected = self.repository.insert(&command)?;
396 self.emit_event(EntityEvent::created(command.entity, command.values))
397 .map_err(RepositoryError::Runtime)?;
398 Ok(affected)
399 }
400
401 pub(super) fn execute_prepared_update(
402 &self,
403 command: UpdateCommand,
404 ) -> Result<u64, RepositoryError<E::Error>> {
405 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
406 let affected = self.repository.update(&command)?;
407 let updated_fields = command.values.keys().cloned().collect();
408 let mut values = command.values.clone();
409 values.insert("id".to_owned(), command.id.clone());
410 if let Some(version) = command.expected_version {
411 values.insert("version".to_owned(), Value::I64(version + 1));
412 }
413 let mut new_values = old_values.clone().unwrap_or_default();
414 for (field, value) in &values {
415 new_values.insert(field.clone(), value.clone());
416 }
417 self.emit_event(EntityEvent::updated_with_old_values(
418 command.entity,
419 values,
420 old_values,
421 new_values,
422 updated_fields,
423 ))
424 .map_err(RepositoryError::Runtime)?;
425 Ok(affected)
426 }
427
428 fn fetch_current_event_row(
429 &self,
430 entity: &str,
431 id: &Value,
432 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
433 if self.repository.metadata.context.event_sink.is_none() {
434 return Ok(None);
435 }
436 let descriptor = self
437 .repository
438 .metadata
439 .context
440 .require_entity(entity)
441 .map_err(RepositoryError::Runtime)?;
442 let Some(id_property) = descriptor.id_property() else {
443 return Ok(None);
444 };
445 let mut rows = self.fetch_all(
446 &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
447 )?;
448 Ok(rows.pop())
449 }
450
451 pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
452 ResolvedRepository {
453 entity,
454 repository: ContextRepository {
455 metadata: UserContextMetadata {
456 context: self.repository.metadata.context,
457 },
458 dialect: self.repository.dialect,
459 executor: self.repository.executor,
460 },
461 }
462 }
463}