teaql_runtime/repository/
base.rs1use teaql_core::{
2 BatchInsertCommand, BatchUpdateCommand, DeleteCommand, Entity, InsertCommand, Record,
3 RecoverCommand, SelectQuery, SmartList, UpdateCommand,
4};
5use teaql_data_service::{MutationRequest, QueryRequest};
6
7use crate::{MetadataStore, RepositoryError, RuntimeError};
8
9use super::Repository;
10
11impl<'a, M, E> Repository<'a, M, E>
12where
13 M: MetadataStore,
14 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor,
15{
16 pub fn new(metadata: &'a M, executor: &'a E) -> Self {
17 Self {
18 metadata,
19 executor,
20 }
21 }
22
23 pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
24 let request = QueryRequest {
25 query: query.clone(),
26 trace_chain: query.trace_chain.clone(),
27 comment: query.comment.clone(),
28 };
29 let res = self.executor.query(request).await.map_err(RepositoryError::Executor)?;
30 Ok(res.rows)
31 }
32
33 pub async fn fetch_smart_list(
34 &self,
35 query: &SelectQuery,
36 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
37 let request = QueryRequest {
38 query: query.clone(),
39 trace_chain: query.trace_chain.clone(),
40 comment: query.comment.clone(),
41 };
42 let res = self.executor.query(request).await.map_err(RepositoryError::Executor)?;
43 self.metadata.record_metadata_log(&res.metadata);
44 Ok(SmartList::from(res.rows))
45 }
46
47 pub async fn fetch_entities<T>(
48 &self,
49 query: &SelectQuery,
50 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
51 where
52 T: Entity,
53 {
54 self.fetch_all(query).await?
55 .into_iter()
56 .map(T::from_record)
57 .collect::<Result<Vec<_>, _>>()
58 .map(SmartList::from)
59 .map_err(RepositoryError::Entity)
60 }
61
62 pub async fn fetch_enhanced_entities<T>(
63 &self,
64 query: &SelectQuery,
65 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
66 where
67 T: Entity,
68 {
69 self.fetch_entities(query).await
70 }
71
72 pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
73 let request = MutationRequest::Insert(command.clone());
74 let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
75 self.metadata.record_metadata_log(&res.metadata);
76 Ok(res.affected_rows)
77 }
78
79 pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
80 let request = MutationRequest::Update(command.clone());
81 let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
82 self.metadata.record_metadata_log(&res.metadata);
83 let affected = res.affected_rows;
84
85 if command.expected_version.is_some() && affected == 0 {
86 return Err(RepositoryError::Runtime(
87 RuntimeError::OptimisticLockConflict {
88 entity: command.entity.clone(),
89 id: format!("{:?}", command.id),
90 },
91 ));
92 }
93
94 Ok(affected)
95 }
96
97 pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
98 let request = MutationRequest::Delete(command.clone());
99 let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
100 self.metadata.record_metadata_log(&res.metadata);
101 let affected = res.affected_rows;
102
103 if command.expected_version.is_some() && affected == 0 {
104 return Err(RepositoryError::Runtime(
105 RuntimeError::OptimisticLockConflict {
106 entity: command.entity.clone(),
107 id: format!("{:?}", command.id),
108 },
109 ));
110 }
111
112 Ok(affected)
113 }
114
115 pub async fn batch_insert(
116 &self,
117 command: &teaql_core::BatchInsertCommand,
118 ) -> Result<u64, RepositoryError<E::Error>> {
119 let mut reqs = Vec::new();
121 for (i, val) in command.batch_values.iter().enumerate() {
122 let mut insert_cmd = InsertCommand::new(command.entity.clone());
123 insert_cmd.values = val.clone();
124 if i < command.trace_chains.len() {
125 insert_cmd.trace_chain = command.trace_chains[i].clone();
126 }
127 reqs.push(MutationRequest::Insert(insert_cmd));
128 }
129 let request = MutationRequest::Batch(reqs);
130 let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
131 self.metadata.record_metadata_log(&res.metadata);
132 Ok(res.affected_rows)
133 }
134
135 pub async fn batch_update(
136 &self,
137 command: &teaql_core::BatchUpdateCommand,
138 ) -> Result<u64, RepositoryError<E::Error>> {
139 let mut reqs = Vec::new();
140 for (i, val) in command.batch_values.iter().enumerate() {
141 let mut update_cmd = UpdateCommand::new(command.entity.clone(), command.batch_ids[i].clone());
142 update_cmd.values = val.clone();
143 if let Some(Some(v)) = command.batch_expected_versions.get(i) {
144 update_cmd.expected_version = Some(*v);
145 }
146 if let Some(old) = command.batch_old_values.get(i) {
147 update_cmd.old_values = old.clone();
148 }
149 if i < command.trace_chains.len() {
150 update_cmd.trace_chain = command.trace_chains[i].clone();
151 }
152 reqs.push(MutationRequest::Update(update_cmd));
153 }
154 let request = MutationRequest::Batch(reqs);
155 let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
156 self.metadata.record_metadata_log(&res.metadata);
157 let affected = res.affected_rows;
158
159 if command.batch_expected_versions.iter().any(|v| v.is_some()) {
160 if affected != command.batch_ids.len() as u64 {
161 return Err(RepositoryError::Runtime(
162 RuntimeError::OptimisticLockConflict {
163 entity: command.entity.clone(),
164 id: "BATCH".to_owned(),
165 },
166 ));
167 }
168 }
169
170 Ok(affected)
171 }
172
173 pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
174 let request = MutationRequest::Recover(command.clone());
175 let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
176 self.metadata.record_metadata_log(&res.metadata);
177 let affected = res.affected_rows;
178
179 if affected == 0 {
180 return Err(RepositoryError::Runtime(
181 RuntimeError::OptimisticLockConflict {
182 entity: command.entity.clone(),
183 id: format!("{:?}", command.id),
184 },
185 ));
186 }
187
188 Ok(affected)
189 }
190
191 pub async fn insert_many(
192 &self,
193 commands: &[InsertCommand],
194 ) -> Result<u64, RepositoryError<E::Error>> {
195 let mut total = 0;
196 for command in commands {
197 total += self.insert(command).await?;
198 }
199 Ok(total)
200 }
201
202 pub async fn update_many(
203 &self,
204 commands: &[UpdateCommand],
205 ) -> Result<u64, RepositoryError<E::Error>> {
206 let mut total = 0;
207 for command in commands {
208 total += self.update(command).await?;
209 }
210 Ok(total)
211 }
212
213 pub async fn delete_many(
214 &self,
215 commands: &[DeleteCommand],
216 ) -> Result<u64, RepositoryError<E::Error>> {
217 let mut total = 0;
218 for command in commands {
219 total += self.delete(command).await?;
220 }
221 Ok(total)
222 }
223
224 pub async fn recover_many(
225 &self,
226 commands: &[RecoverCommand],
227 ) -> Result<u64, RepositoryError<E::Error>> {
228 let mut total = 0;
229 for command in commands {
230 total += self.recover(command).await?;
231 }
232 Ok(total)
233 }
234}