Skip to main content

teaql_runtime/repository/
context.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5    DeleteCommand, Entity, InsertCommand, Record, RecoverCommand, SelectQuery, SmartList,
6    UpdateCommand,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11    ContextError, GraphMutationPlan, GraphNode, RepositoryError, RuntimeError, SqlLogOperation,
12    UserContext,
13};
14
15use super::{
16    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17    Repository, ResolvedRepository, UserContextMetadata,
18    helpers::invalidate_aggregation_cache_namespace,
19};
20
21impl UserContext {
22    pub fn repository<D, E>(&self) -> Result<ContextRepository<'_, D, E>, ContextError>
23    where
24        D: SqlDialect + Send + Sync + 'static,
25        E: QueryExecutor + Send + Sync + 'static,
26    {
27        if self.metadata.is_none() {
28            return Err(ContextError::MissingResource("metadata".to_owned()));
29        }
30
31        let dialect = self.require_resource::<D>()?;
32        let executor = self.require_resource::<E>()?;
33        Ok(ContextRepository {
34            metadata: UserContextMetadata { context: self },
35            dialect,
36            executor,
37        })
38    }
39
40    pub fn resolve_repository<D, E>(
41        &self,
42        entity: impl Into<String>,
43    ) -> Result<ResolvedRepository<'_, D, E>, ContextError>
44    where
45        D: SqlDialect + Send + Sync + 'static,
46        E: QueryExecutor + Send + Sync + 'static,
47    {
48        let entity = entity.into();
49        if !self.has_repository(&entity) {
50            return Err(ContextError::MissingRepository(entity));
51        }
52        Ok(ResolvedRepository {
53            entity,
54            repository: self.repository::<D, E>()?,
55        })
56    }
57
58    pub fn plan_for_save_graph<D, E>(
59        &self,
60        node: GraphNode,
61    ) -> Result<GraphMutationPlan, RepositoryError<E::Error>>
62    where
63        D: SqlDialect + Send + Sync + 'static,
64        E: QueryExecutor + Send + Sync + 'static,
65    {
66        let repository = self
67            .resolve_repository::<D, E>(node.entity.clone())
68            .map_err(|err| RepositoryError::Runtime(RuntimeError::Graph(err.to_string())))?;
69        repository.plan_graph(node)
70    }
71}
72
73impl<'a, D, E> ContextRepository<'a, D, E>
74where
75    D: SqlDialect,
76    E: QueryExecutor,
77{
78    fn repository(&self) -> Repository<'_, D, UserContextMetadata<'_>, E> {
79        Repository::new(self.dialect, &self.metadata, self.executor)
80    }
81
82    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
83        self.repository().compile(query)
84    }
85
86    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
87        let mut compiled = self.compile(query).map_err(RepositoryError::Runtime)?;
88        let final_comment = self.resolve_final_comment(query.comment.clone());
89        compiled.comment = final_comment;
90
91        let started_at = SystemTime::now();
92        let started = Instant::now();
93        let rows = self
94            .executor
95            .fetch_all(&compiled)
96            .map_err(RepositoryError::Executor)?;
97        self.log_sql_result(
98            SqlLogOperation::Select,
99            &compiled,
100            started_at,
101            started,
102            Some(rows.len()),
103            Some(query.entity.clone()),
104            None,
105            query.comment.clone(),
106        );
107        Ok(rows)
108    }
109
110    pub fn fetch_smart_list(
111        &self,
112        query: &SelectQuery,
113    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
114        self.repository().fetch_smart_list(query)
115    }
116
117    pub fn fetch_entities<T>(
118        &self,
119        query: &SelectQuery,
120    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
121    where
122        T: Entity,
123    {
124        self.repository().fetch_entities(query)
125    }
126
127    pub fn fetch_enhanced_entities<T>(
128        &self,
129        query: &SelectQuery,
130    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
131    where
132        T: Entity,
133    {
134        self.repository().fetch_enhanced_entities(query)
135    }
136
137    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
138        let mut compiled = self
139            .repository()
140            .compile_insert(command)
141            .map_err(RepositoryError::Runtime)?;
142        let final_comment = self.resolve_final_comment(None);
143        compiled.comment = final_comment;
144
145        let started_at = SystemTime::now();
146        let started = Instant::now();
147        let affected = self
148            .executor
149            .execute(&compiled)
150            .map_err(RepositoryError::Executor)?;
151        self.log_sql_result(
152            SqlLogOperation::Insert,
153            &compiled,
154            started_at,
155            started,
156            None,
157            None,
158            Some(affected),
159            None,
160        );
161        self.invalidate_aggregation_cache_for(&command.entity);
162        Ok(affected)
163    }
164
165    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
166        let affected = self.execute_mutation(
167            SqlLogOperation::Update,
168            &command.entity,
169            self.repository()
170                .compile_update(command)
171                .map_err(RepositoryError::Runtime)?,
172        )?;
173        if command.expected_version.is_some() && affected == 0 {
174            return Err(RepositoryError::Runtime(
175                RuntimeError::OptimisticLockConflict {
176                    entity: command.entity.clone(),
177                    id: format!("{:?}", command.id),
178                },
179            ));
180        }
181        Ok(affected)
182    }
183
184    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
185        let affected = self.execute_mutation(
186            SqlLogOperation::Delete,
187            &command.entity,
188            self.repository()
189                .compile_delete(command)
190                .map_err(RepositoryError::Runtime)?,
191        )?;
192        if command.expected_version.is_some() && affected == 0 {
193            return Err(RepositoryError::Runtime(
194                RuntimeError::OptimisticLockConflict {
195                    entity: command.entity.clone(),
196                    id: format!("{:?}", command.id),
197                },
198            ));
199        }
200        Ok(affected)
201    }
202
203    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
204        let affected = self.execute_mutation(
205            SqlLogOperation::Recover,
206            &command.entity,
207            self.repository()
208                .compile_recover(command)
209                .map_err(RepositoryError::Runtime)?,
210        )?;
211        if affected == 0 {
212            return Err(RepositoryError::Runtime(
213                RuntimeError::OptimisticLockConflict {
214                    entity: command.entity.clone(),
215                    id: format!("{:?}", command.id),
216                },
217            ));
218        }
219        Ok(affected)
220    }
221
222    fn execute_mutation(
223        &self,
224        operation: SqlLogOperation,
225        entity: &str,
226        mut compiled: CompiledQuery,
227    ) -> Result<u64, RepositoryError<E::Error>> {
228        let final_comment = self.resolve_final_comment(None);
229        compiled.comment = final_comment;
230
231        let started_at = SystemTime::now();
232        let started = Instant::now();
233        let affected = self
234            .executor
235            .execute(&compiled)
236            .map_err(RepositoryError::Executor)?;
237        self.log_sql_result(
238            operation,
239            &compiled,
240            started_at,
241            started,
242            None,
243            None,
244            Some(affected),
245            None,
246        );
247        self.invalidate_aggregation_cache_for(entity);
248        Ok(affected)
249    }
250
251    pub(super) fn log_sql_result(
252        &self,
253        operation: SqlLogOperation,
254        compiled: &CompiledQuery,
255        started_at: SystemTime,
256        started: Instant,
257        result_count: Option<usize>,
258        result_type: Option<String>,
259        affected_rows: Option<u64>,
260        comment: Option<String>,
261    ) {
262        self.metadata.context.record_sql_log(
263            operation,
264            compiled,
265            self.dialect.kind(),
266            started_at,
267            SystemTime::now(),
268            started.elapsed(),
269            result_count,
270            result_type,
271            affected_rows,
272            comment,
273        );
274    }
275
276    pub(super) fn invalidate_aggregation_cache_for(&self, entity: &str) {
277        if let Some(cache) = self
278            .metadata
279            .context
280            .get_resource::<Arc<dyn AggregationCacheBackend>>()
281        {
282            invalidate_aggregation_cache_namespace(cache.as_ref(), entity);
283        }
284        if let Some(cache) = self
285            .metadata
286            .context
287            .get_resource::<InMemoryAggregationCache>()
288        {
289            invalidate_aggregation_cache_namespace(cache, entity);
290        }
291    }
292
293    pub(crate) fn resolve_final_comment(&self, comment: Option<String>) -> Option<String> {
294        let stack_comment = self.metadata.context.comment_stack.lock().ok().and_then(|stack| {
295            if stack.is_empty() {
296                None
297            } else {
298                Some(stack.join("->"))
299            }
300        });
301        let business_comment = stack_comment.or(comment);
302        let user_id = self.metadata.context.user_identifier().map(|s| s.to_owned());
303
304        match (user_id, business_comment) {
305            (Some(user), Some(bus)) if !user.is_empty() && !bus.is_empty() => {
306                Some(format!("[{user}] {bus}"))
307            }
308            (Some(user), _) if !user.is_empty() => {
309                Some(format!("[{user}]"))
310            }
311            (_, Some(bus)) if !bus.is_empty() => {
312                Some(bus)
313            }
314            _ => None,
315        }
316    }
317}