Skip to main content

teaql_runtime/repository/
base.rs

1use teaql_core::{
2    BatchInsertCommand, BatchUpdateCommand, DeleteCommand, Entity, InsertCommand, Record,
3    RecoverCommand, SelectQuery, SmartList, UpdateCommand,
4};
5use teaql_data_service::{MutationRequest, QueryRequest};
6
7use crate::{MetadataStore, RepositoryError, RuntimeError};
8
9use super::Repository;
10
11impl<'a, M, E> Repository<'a, M, E>
12where
13    M: MetadataStore,
14    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor,
15{
16    pub fn new(metadata: &'a M, executor: &'a E) -> Self {
17        Self {
18            metadata,
19            executor,
20        }
21    }
22
23    pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
24        let request = QueryRequest {
25            query: query.clone(),
26            trace_chain: query.trace_chain.clone(),
27            comment: query.comment.clone(),
28        };
29        let res = self.executor.query(request).await.map_err(RepositoryError::Executor)?;
30        Ok(res.rows)
31    }
32
33    pub async fn fetch_smart_list(
34        &self,
35        query: &SelectQuery,
36    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
37        let request = QueryRequest {
38            query: query.clone(),
39            trace_chain: query.trace_chain.clone(),
40            comment: query.comment.clone(),
41        };
42        let res = self.executor.query(request).await.map_err(RepositoryError::Executor)?;
43        self.metadata.record_metadata_log(&res.metadata);
44        Ok(SmartList::from(res.rows))
45    }
46
47    pub async fn fetch_entities<T>(
48        &self,
49        query: &SelectQuery,
50    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
51    where
52        T: Entity,
53    {
54        self.fetch_all(query).await?
55            .into_iter()
56            .map(T::from_record)
57            .collect::<Result<Vec<_>, _>>()
58            .map(SmartList::from)
59            .map_err(RepositoryError::Entity)
60    }
61
62    pub async fn fetch_enhanced_entities<T>(
63        &self,
64        query: &SelectQuery,
65    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
66    where
67        T: Entity,
68    {
69        self.fetch_entities(query).await
70    }
71
72    pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
73        let request = MutationRequest::Insert(command.clone());
74        let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
75        self.metadata.record_metadata_log(&res.metadata);
76        Ok(res.affected_rows)
77    }
78
79    pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
80        let request = MutationRequest::Update(command.clone());
81        let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
82        self.metadata.record_metadata_log(&res.metadata);
83        let affected = res.affected_rows;
84
85        if command.expected_version.is_some() && affected == 0 {
86            return Err(RepositoryError::Runtime(
87                RuntimeError::OptimisticLockConflict {
88                    entity: command.entity.clone(),
89                    id: format!("{:?}", command.id),
90                },
91            ));
92        }
93
94        Ok(affected)
95    }
96
97    pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
98        let request = MutationRequest::Delete(command.clone());
99        let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
100        self.metadata.record_metadata_log(&res.metadata);
101        let affected = res.affected_rows;
102
103        if command.expected_version.is_some() && affected == 0 {
104            return Err(RepositoryError::Runtime(
105                RuntimeError::OptimisticLockConflict {
106                    entity: command.entity.clone(),
107                    id: format!("{:?}", command.id),
108                },
109            ));
110        }
111
112        Ok(affected)
113    }
114
115    pub async fn batch_insert(
116        &self,
117        command: &teaql_core::BatchInsertCommand,
118    ) -> Result<u64, RepositoryError<E::Error>> {
119        // Build individual InsertCommands for now, or use BatchMutation if appropriate
120        let mut reqs = Vec::new();
121        for (i, val) in command.batch_values.iter().enumerate() {
122            let mut insert_cmd = InsertCommand::new(command.entity.clone());
123            insert_cmd.values = val.clone();
124            if i < command.trace_chains.len() {
125                insert_cmd.trace_chain = command.trace_chains[i].clone();
126            }
127            reqs.push(MutationRequest::Insert(insert_cmd));
128        }
129        let request = MutationRequest::Batch(reqs);
130        let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
131        self.metadata.record_metadata_log(&res.metadata);
132        Ok(res.affected_rows)
133    }
134
135    pub async fn batch_update(
136        &self,
137        command: &teaql_core::BatchUpdateCommand,
138    ) -> Result<u64, RepositoryError<E::Error>> {
139        let mut reqs = Vec::new();
140        for (i, val) in command.batch_values.iter().enumerate() {
141            let mut update_cmd = UpdateCommand::new(command.entity.clone(), command.batch_ids[i].clone());
142            update_cmd.values = val.clone();
143            if let Some(Some(v)) = command.batch_expected_versions.get(i) {
144                update_cmd.expected_version = Some(*v);
145            }
146            if let Some(old) = command.batch_old_values.get(i) {
147                update_cmd.old_values = old.clone();
148            }
149            if i < command.trace_chains.len() {
150                update_cmd.trace_chain = command.trace_chains[i].clone();
151            }
152            reqs.push(MutationRequest::Update(update_cmd));
153        }
154        let request = MutationRequest::Batch(reqs);
155        let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
156        self.metadata.record_metadata_log(&res.metadata);
157        let affected = res.affected_rows;
158
159        if command.batch_expected_versions.iter().any(|v| v.is_some()) {
160            if affected != command.batch_ids.len() as u64 {
161                return Err(RepositoryError::Runtime(
162                    RuntimeError::OptimisticLockConflict {
163                        entity: command.entity.clone(),
164                        id: "BATCH".to_owned(),
165                    },
166                ));
167            }
168        }
169
170        Ok(affected)
171    }
172
173    pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
174        let request = MutationRequest::Recover(command.clone());
175        let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
176        self.metadata.record_metadata_log(&res.metadata);
177        let affected = res.affected_rows;
178
179        if affected == 0 {
180            return Err(RepositoryError::Runtime(
181                RuntimeError::OptimisticLockConflict {
182                    entity: command.entity.clone(),
183                    id: format!("{:?}", command.id),
184                },
185            ));
186        }
187
188        Ok(affected)
189    }
190
191    pub async fn insert_many(
192        &self,
193        commands: &[InsertCommand],
194    ) -> Result<u64, RepositoryError<E::Error>> {
195        let mut total = 0;
196        for command in commands {
197            total += self.insert(command).await?;
198        }
199        Ok(total)
200    }
201
202    pub async fn update_many(
203        &self,
204        commands: &[UpdateCommand],
205    ) -> Result<u64, RepositoryError<E::Error>> {
206        let mut total = 0;
207        for command in commands {
208            total += self.update(command).await?;
209        }
210        Ok(total)
211    }
212
213    pub async fn delete_many(
214        &self,
215        commands: &[DeleteCommand],
216    ) -> Result<u64, RepositoryError<E::Error>> {
217        let mut total = 0;
218        for command in commands {
219            total += self.delete(command).await?;
220        }
221        Ok(total)
222    }
223
224    pub async fn recover_many(
225        &self,
226        commands: &[RecoverCommand],
227    ) -> Result<u64, RepositoryError<E::Error>> {
228        let mut total = 0;
229        for command in commands {
230            total += self.recover(command).await?;
231        }
232        Ok(total)
233    }
234}