Skip to main content

teaql_runtime/repository/
context.rs

1use std::sync::Arc;
2
3use teaql_core::{
4    DeleteCommand, Entity, InsertCommand, Record, RecoverCommand, SelectQuery, SmartList,
5    UpdateCommand,
6};
7
8use crate::{
9    ContextError, GraphMutationPlan, GraphNode, RepositoryError, RuntimeError,
10    UserContext,
11};
12
13use super::{
14    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15    Repository, ResolvedRepository, UserContextMetadata,
16    helpers::invalidate_aggregation_cache_namespace,
17};
18
19impl UserContext {
20    pub fn repository<E>(&self) -> Result<ContextRepository<'_, E>, ContextError>
21    where
22        E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
23    {
24        if self.metadata.is_none() {
25            return Err(ContextError::MissingResource("metadata".to_owned()));
26        }
27
28        let executor = self.require_resource::<E>()?;
29        Ok(ContextRepository {
30            metadata: UserContextMetadata { context: self },
31            executor,
32        })
33    }
34
35    #[doc(hidden)]
36    pub fn resolve_repository<E>(
37        &self,
38        entity: impl Into<String>,
39    ) -> Result<ResolvedRepository<'_, E>, ContextError>
40    where
41        E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
42    {
43        let entity = entity.into();
44        if !self.has_repository(&entity) {
45            return Err(ContextError::MissingRepository(entity));
46        }
47        Ok(ResolvedRepository {
48            entity,
49            repository: self.repository::<E>()?,
50            trace_context: Vec::new(),
51        })
52    }
53
54    pub async fn plan_for_save_graph<E>(
55        &self,
56        node: GraphNode,
57    ) -> Result<GraphMutationPlan, RepositoryError<E::Error>>
58    where
59        E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
60    {
61        let repository = self
62            .resolve_repository::<E>(node.entity.clone())
63            .map_err(|err| RepositoryError::Runtime(RuntimeError::Graph(err.to_string())))?;
64        repository.plan_graph(node).await
65    }
66}
67
68impl<'a, E> ContextRepository<'a, E>
69where
70    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
71{
72    fn repository(&self) -> Repository<'_, UserContextMetadata<'_>, E> {
73        Repository::new(&self.metadata, self.executor)
74    }
75
76    pub async fn fetch_all(&self, mut query: SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
77        let final_comment = self.resolve_final_comment(&query.trace_chain, query.comment.clone());
78        query.comment = final_comment;
79        self.repository().fetch_all(&query).await
80    }
81
82    pub async fn fetch_smart_list(
83        &self,
84        query: &SelectQuery,
85    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
86        self.repository().fetch_smart_list(query).await
87    }
88
89    pub async fn fetch_entities<T>(
90        &self,
91        query: &SelectQuery,
92    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
93    where
94        T: Entity,
95    {
96        self.repository().fetch_entities(query).await
97    }
98
99    pub async fn fetch_enhanced_entities<T>(
100        &self,
101        query: &SelectQuery,
102    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
103    where
104        T: Entity,
105    {
106        self.repository().fetch_enhanced_entities(query).await
107    }
108
109    pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
110        let affected = self.repository().insert(command).await?;
111        self.invalidate_aggregation_cache_for(&command.entity);
112        Ok(affected)
113    }
114
115    pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
116        let affected = self.repository().update(command).await?;
117        self.invalidate_aggregation_cache_for(&command.entity);
118        Ok(affected)
119    }
120
121    pub async fn batch_insert(&self, command: &teaql_core::BatchInsertCommand) -> Result<u64, RepositoryError<E::Error>> {
122        let affected = self.repository().batch_insert(command).await?;
123        self.invalidate_aggregation_cache_for(&command.entity);
124        Ok(affected)
125    }
126
127    pub async fn batch_update(&self, command: &teaql_core::BatchUpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
128        let affected = self.repository().batch_update(command).await?;
129        self.invalidate_aggregation_cache_for(&command.entity);
130        Ok(affected)
131    }
132
133    pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
134        let affected = self.repository().delete(command).await?;
135        self.invalidate_aggregation_cache_for(&command.entity);
136        Ok(affected)
137    }
138
139    pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
140        let affected = self.repository().recover(command).await?;
141        self.invalidate_aggregation_cache_for(&command.entity);
142        Ok(affected)
143    }
144
145    pub(super) fn invalidate_aggregation_cache_for(&self, entity: &str) {
146        if let Some(cache) = self
147            .metadata
148            .context
149            .get_resource::<Arc<dyn AggregationCacheBackend>>()
150        {
151            invalidate_aggregation_cache_namespace(cache.as_ref(), entity);
152        }
153        if let Some(cache) = self
154            .metadata
155            .context
156            .get_resource::<InMemoryAggregationCache>()
157        {
158            invalidate_aggregation_cache_namespace(cache, entity);
159        }
160    }
161
162    pub(crate) fn resolve_final_comment(&self, trace_chain: &[teaql_core::TraceNode], comment: Option<String>) -> Option<String> {
163        let chain_str = if trace_chain.is_empty() {
164            None
165        } else {
166            let formatted = trace_chain.iter().map(|n| {
167                format!("{}({}): {}", n.entity_type, n.entity_id.map(|id| id.to_string()).unwrap_or_else(|| "pending".to_owned()), n.comment)
168            }).collect::<Vec<_>>().join(" -> ");
169            Some(formatted)
170        };
171
172        let business_comment = chain_str.or(comment);
173        let user_id = self.metadata.context.user_identifier().map(|s| s.to_owned());
174
175        match (user_id, business_comment) {
176            (Some(user), Some(bus)) if !user.is_empty() && !bus.is_empty() => {
177                Some(format!("[{user}] {bus}"))
178            }
179            (Some(user), _) if !user.is_empty() => {
180                Some(format!("[{user}]"))
181            }
182            (_, Some(bus)) if !bus.is_empty() => {
183                Some(bus)
184            }
185            _ => None,
186        }
187    }
188}