teaql_runtime/repository/
base.rs1use teaql_core::{
2 BatchInsertCommand, BatchUpdateCommand, DeleteCommand, Entity, InsertCommand, Record,
3 RecoverCommand, SelectQuery, SmartList, UpdateCommand,
4};
5use teaql_data_service::{MutationRequest, QueryRequest};
6
7use crate::{MetadataStore, RepositoryError, RuntimeError};
8
9use super::Repository;
10
11impl<'a, M, E> Repository<'a, M, E>
12where
13 M: MetadataStore,
14 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor,
15{
16 pub fn new(metadata: &'a M, executor: &'a E) -> Self {
17 Self {
18 metadata,
19 executor,
20 }
21 }
22
23 pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
24 let request = QueryRequest {
25 query: query.clone(),
26 trace_chain: query.trace_chain.clone(),
27 comment: query.comment.clone(),
28 };
29 let res = self.executor.query(request).await.map_err(RepositoryError::Executor)?;
30 Ok(res.rows)
31 }
32
33 pub async fn fetch_smart_list(
34 &self,
35 query: &SelectQuery,
36 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
37 let request = QueryRequest {
38 query: query.clone(),
39 trace_chain: query.trace_chain.clone(),
40 comment: query.comment.clone(),
41 };
42 let res = self.executor.query(request).await.map_err(RepositoryError::Executor)?;
43 self.metadata.record_metadata_log(&res.metadata);
44 Ok(SmartList::from(res.rows))
45 }
46
47 pub async fn fetch_entities<T>(
48 &self,
49 query: &SelectQuery,
50 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
51 where
52 T: Entity,
53 {
54 self.fetch_all(query).await?
55 .into_iter()
56 .map(T::from_record)
57 .collect::<Result<Vec<_>, _>>()
58 .map(SmartList::from)
59 .map_err(RepositoryError::Entity)
60 }
61
62 pub async fn fetch_enhanced_entities<T>(
63 &self,
64 query: &SelectQuery,
65 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
66 where
67 T: Entity,
68 {
69 self.fetch_entities(query).await
70 }
71
72 pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
73 let request = MutationRequest::Insert(command.clone());
74 let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
75 self.metadata.record_metadata_log(&res.metadata);
76 Ok(res.affected_rows)
77 }
78
79 pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
80 let request = MutationRequest::Update(command.clone());
81 let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
82 self.metadata.record_metadata_log(&res.metadata);
83 let affected = res.affected_rows;
84
85 if command.expected_version.is_some() && affected == 0 {
86 println!("OptimisticLockConflict in base.rs update! entity={}, id={:?}", command.entity, command.id);
87 println!("Backtrace: {:#?}", std::backtrace::Backtrace::force_capture());
88 return Err(RepositoryError::Runtime(
89 RuntimeError::OptimisticLockConflict {
90 entity: command.entity.clone(),
91 id: format!("{:?}", command.id),
92 },
93 ));
94 }
95
96 Ok(affected)
97 }
98
99 pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
100 let request = MutationRequest::Delete(command.clone());
101 let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
102 self.metadata.record_metadata_log(&res.metadata);
103 let affected = res.affected_rows;
104
105 if command.expected_version.is_some() && affected == 0 {
106 return Err(RepositoryError::Runtime(
107 RuntimeError::OptimisticLockConflict {
108 entity: command.entity.clone(),
109 id: format!("{:?}", command.id),
110 },
111 ));
112 }
113
114 Ok(affected)
115 }
116
117 pub async fn batch_insert(
118 &self,
119 command: &teaql_core::BatchInsertCommand,
120 ) -> Result<u64, RepositoryError<E::Error>> {
121 let mut affected = 0;
123 for (i, val) in command.batch_values.iter().enumerate() {
124 let mut insert_cmd = InsertCommand::new(command.entity.clone());
125 insert_cmd.values = val.clone();
126 if i < command.trace_chains.len() {
127 insert_cmd.trace_chain = command.trace_chains[i].clone();
128 }
129 let res = self.executor.mutate(MutationRequest::Insert(insert_cmd)).await.map_err(RepositoryError::Executor)?;
130 self.metadata.record_metadata_log(&res.metadata);
131 affected += res.affected_rows;
132 }
133 Ok(affected)
134 }
135
136 pub async fn batch_update(
137 &self,
138 command: &teaql_core::BatchUpdateCommand,
139 ) -> Result<u64, RepositoryError<E::Error>> {
140 let mut affected = 0;
141 for (i, val) in command.batch_values.iter().enumerate() {
142 let mut update_cmd = UpdateCommand::new(command.entity.clone(), command.batch_ids[i].clone());
143
144 let mut filtered_values = Record::new();
145 for field in &command.update_fields {
146 if let Some(v) = val.get(field) {
147 filtered_values.insert(field.clone(), v.clone());
148 }
149 }
150 update_cmd.values = filtered_values;
151 if let Some(Some(v)) = command.batch_expected_versions.get(i) {
152 update_cmd.expected_version = Some(*v);
153 }
154 if let Some(old) = command.batch_old_values.get(i) {
155 update_cmd.old_values = old.clone();
156 }
157 if i < command.trace_chains.len() {
158 update_cmd.trace_chain = command.trace_chains[i].clone();
159 }
160 let res = self.executor.mutate(MutationRequest::Update(update_cmd)).await.map_err(RepositoryError::Executor)?;
161 self.metadata.record_metadata_log(&res.metadata);
162 affected += res.affected_rows;
163 }
164
165 if command.batch_expected_versions.iter().any(|v| v.is_some()) {
166 if affected != command.batch_ids.len() as u64 {
167 println!("OptimisticLockConflict in batch_update! entity={}, affected={}, expected={}", command.entity, affected, command.batch_ids.len());
168 return Err(RepositoryError::Runtime(
169 RuntimeError::OptimisticLockConflict {
170 entity: command.entity.clone(),
171 id: "BATCH".to_owned(),
172 },
173 ));
174 }
175 }
176
177 Ok(affected)
178 }
179
180 pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
181 let request = MutationRequest::Recover(command.clone());
182 let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
183 self.metadata.record_metadata_log(&res.metadata);
184 let affected = res.affected_rows;
185
186 if affected == 0 {
187 return Err(RepositoryError::Runtime(
188 RuntimeError::OptimisticLockConflict {
189 entity: command.entity.clone(),
190 id: format!("{:?}", command.id),
191 },
192 ));
193 }
194
195 Ok(affected)
196 }
197
198 pub async fn insert_many(
199 &self,
200 commands: &[InsertCommand],
201 ) -> Result<u64, RepositoryError<E::Error>> {
202 let mut total = 0;
203 for command in commands {
204 total += self.insert(command).await?;
205 }
206 Ok(total)
207 }
208
209 pub async fn update_many(
210 &self,
211 commands: &[UpdateCommand],
212 ) -> Result<u64, RepositoryError<E::Error>> {
213 let mut total = 0;
214 for command in commands {
215 total += self.update(command).await?;
216 }
217 Ok(total)
218 }
219
220 pub async fn delete_many(
221 &self,
222 commands: &[DeleteCommand],
223 ) -> Result<u64, RepositoryError<E::Error>> {
224 let mut total = 0;
225 for command in commands {
226 total += self.delete(command).await?;
227 }
228 Ok(total)
229 }
230
231 pub async fn recover_many(
232 &self,
233 commands: &[RecoverCommand],
234 ) -> Result<u64, RepositoryError<E::Error>> {
235 let mut total = 0;
236 for command in commands {
237 total += self.recover(command).await?;
238 }
239 Ok(total)
240 }
241}