artemis_normalized_cache/cache_exchange/
mod.rs1use crate::{
5 store::Store,
6 types::{NormalizedCacheExtension, NormalizedCacheOptions},
7 HashSet
8};
9use artemis::{
10 exchange::{
11 Client, Exchange, ExchangeFactory, ExchangeResult, Operation, OperationResult,
12 OperationType
13 },
14 DebugInfo, GraphQLQuery, QueryError, RequestPolicy, Response, ResultSource
15};
16#[cfg(target_arch = "wasm32")]
17use parking_lot::RwLock;
18#[cfg(target_arch = "wasm32")]
19use serde::de::DeserializeOwned;
20use std::{collections::HashMap, ptr, sync::Arc};
21#[cfg(target_arch = "wasm32")]
22use wasm_bindgen::JsValue;
23
24#[cfg(test)]
25mod tests;
26
27#[derive(Default)]
29pub struct NormalizedCacheExchange {
30 options: Option<NormalizedCacheOptions>
31}
32
33impl NormalizedCacheExchange {
34 #[allow(unused)]
36 pub fn with_options(options: NormalizedCacheOptions) -> Self {
37 Self {
38 options: Some(options)
39 }
40 }
41
42 #[allow(unused)]
44 pub fn new() -> Self {
45 Self { options: None }
46 }
47}
48
49impl<TNext: Exchange> ExchangeFactory<TNext> for NormalizedCacheExchange {
50 type Output = NormalizedCacheImpl<TNext>;
51
52 fn build(self, next: TNext) -> NormalizedCacheImpl<TNext> {
53 let options = self.options.unwrap_or_else(NormalizedCacheOptions::default);
54 let store = Store::new(options.custom_keys.unwrap_or_else(HashMap::default));
55 NormalizedCacheImpl {
56 next,
57 store: Arc::new(store),
58 #[cfg(target_arch = "wasm32")]
59 updaters: HashMap::new()
60 }
61 }
62}
63
64pub struct NormalizedCacheImpl<TNext: Exchange> {
67 next: TNext,
68 store: Arc<Store>,
69 #[cfg(target_arch = "wasm32")]
70 updaters: Arc<RwLock<HashMap<u64, Box<dyn Fn(JsValue, js_sys::Function, *mut usize)>>>>
71}
72
73fn should_cache<Q: GraphQLQuery>(operation: &Operation<Q::Variables>) -> bool {
74 operation.meta.operation_type == OperationType::Query
75 && operation.options.request_policy != RequestPolicy::NetworkOnly
76}
77
78fn is_optimistic_mutation<Q: GraphQLQuery>(op: &Operation<Q::Variables>) -> bool {
79 op.meta.operation_type == OperationType::Mutation
80 && op.options.request_policy != RequestPolicy::NetworkOnly
81}
82
83impl<TNext: Exchange> NormalizedCacheImpl<TNext> {
84 #[cfg(target_arch = "wasm32")]
85 fn write_updater<Q: GraphQLQuery>(&self, operation: &Operation<Q::Variables>)
86 where
87 Q::Variables: DeserializeOwned
88 {
89 if self.updaters.read().contains_key(&operation.key) {
90 let updater = {
91 let store = self.store.clone();
92 move |variables: Value, updater_fn: js_sys::Function, dependencies: *mut usize| {
93 store.update_query_js::<Q>(variables, updater_fn, dependencies);
94 }
95 };
96 self.updaters
97 .write()
98 .insert(operation.key, Box::new(updater));
99 }
100 }
101
102 #[cfg(not(target_arch = "wasm32"))]
103 fn write_updater<Q: GraphQLQuery>(&self, _operation: &Operation<Q::Variables>) {}
104
105 fn write_query<Q: GraphQLQuery, C: Client>(
106 &self,
107 result: &OperationResult<Q::ResponseData>,
108 variables: &Q::Variables,
109 client: &C
110 ) -> Result<(), QueryError> {
111 let query_key = result.key;
112 let mut dependencies = HashSet::default();
113 if result.response.errors.is_some() {
114 self.store.clear_optimistic_layer(query_key);
115 } else {
116 self.store
117 .write_query::<Q>(result, variables, false, &mut dependencies)?;
118 self.store.rerun_queries(dependencies, query_key, client);
119 self.store.clear_optimistic_layer(query_key);
120 }
121 Ok(())
122 }
123
124 fn invalidate_and_update<Q: GraphQLQuery, C: Client>(
125 &self,
126 result: &OperationResult<Q::ResponseData>,
127 variables: Q::Variables,
128 client: &C,
129 extension: Option<&NormalizedCacheExtension>
130 ) {
131 let query_key = result.key;
132 let mut dependencies = HashSet::default();
133 self.store
134 .write_query::<Q>(result, &variables, false, &mut dependencies)
135 .unwrap();
136 self.store
137 .invalidate_query::<Q>(result, &variables, false, &mut dependencies);
138 if let Some(updater) = extension.and_then(|ext| ext.update.as_ref()) {
139 updater(
140 &result.response.data,
141 self.store.clone().into(),
142 &mut dependencies
143 );
144 } else {
145 self.update_js::<Q>(extension, result.response.data.as_ref(), &mut dependencies);
146 }
147 println!("Invalidated dependencies {:?}", dependencies);
148 self.store.rerun_queries(dependencies, query_key, client);
149 }
150
151 #[cfg(target_arch = "wasm32")]
152 fn update_js<Q: GraphQLQuery>(
153 &self,
154 extension: Option<&NormalizedCacheExtension>,
155 data: Option<&Q::ResponseData>,
156 dependencies: &mut Vec<String>
157 ) {
158 if let Some(updater) = extension.and_then(|ext| ext.update_js) {
159 let this = JsValue::NULL;
160 let data = serde_wasm_bindgen::to_value(data).unwrap();
161 let dependencies = dependencies as *mut _ as *mut usize;
162 updater.call3(
163 &this,
164 &data,
165 &|query: u64,
166 variables: JsValue,
167 updater: js_sys::Function,
168 dependencies: *mut usize| {
169 let updaters = self.updaters.read();
170 let updater_fn = updaters.get(&query);
171 if let Some(updater_fn) = updater_fn {
172 updater_fn(variables, updater, dependencies);
173 }
174 },
175 dependencies
176 );
177 }
178 }
179
180 #[cfg(not(target_arch = "wasm32"))]
181 fn update_js<Q: GraphQLQuery>(
182 &self,
183 _extension: Option<&NormalizedCacheExtension>,
184 _data: Option<&Q::ResponseData>,
185 _dependencies: &mut HashSet<String>
186 ) {
187 }
188
189 fn run_optimistic_query<Q: GraphQLQuery, C: Client>(
190 &self,
191 operation: &Operation<Q::Variables>,
192 client: &C,
193 extension: Option<&NormalizedCacheExtension>
194 ) {
195 if operation.options.request_policy != RequestPolicy::NetworkOnly {
196 let variables = &operation.query.variables;
197 let data = extension
198 .and_then(|ext| ext.optimistic_result.as_ref())
199 .and_then(|resolver| resolver())
200 .and_then(|result| result.downcast::<Q::ResponseData>().ok())
201 .map(|result| *result);
202
203 if let Some(data) = data {
204 let result = OperationResult {
205 key: operation.key,
206 meta: operation.meta.clone(),
207 response: Response {
208 data: Some(data),
209 debug_info: None,
210 errors: None
211 }
212 };
213
214 let query_key = operation.key;
215
216 let mut dependencies = HashSet::default();
217 self.store
218 .write_query::<Q>(&result, variables, true, &mut dependencies)
219 .unwrap();
220
221 println!("Optimistic dependencies: {:?}", dependencies);
222 self.store.rerun_queries(dependencies, query_key, client);
223 client.push_result(operation.key, Ok(result))
224 }
225 }
226 }
227
228 fn run_optimistic_mutation<Q: GraphQLQuery, C: Client>(
229 &self,
230 operation: &Operation<Q::Variables>,
231 client: &C,
232 extension: Option<&NormalizedCacheExtension>
233 ) {
234 if is_optimistic_mutation::<Q>(operation) {
235 let variables = &operation.query.variables;
236 let data = extension
237 .and_then(|extension: &NormalizedCacheExtension| {
238 extension.optimistic_result.as_ref()
239 })
240 .and_then(|resolver| resolver())
241 .and_then(|result| result.downcast::<Q::ResponseData>().ok())
242 .map(|result| *result);
243
244 if let Some(data) = data {
245 let result = OperationResult {
246 key: operation.key,
247 meta: operation.meta.clone(),
248 response: Response {
249 data: Some(data),
250 debug_info: None,
251 errors: None
252 }
253 };
254
255 let query_key = operation.key;
256 let mut dependencies = HashSet::default();
257 self.store
258 .invalidate_query::<Q>(&result, variables, true, &mut dependencies);
259 self.store
260 .write_query::<Q>(&result, variables, true, &mut dependencies)
261 .unwrap();
262 if let Some(updater) = extension.and_then(|ext| ext.update.as_ref()) {
263 updater(
264 &result.response.data,
265 self.store.clone().into(),
266 &mut dependencies
267 );
268 } else {
269 self.update_js::<Q>(
270 extension,
271 result.response.data.as_ref(),
272 &mut dependencies
273 );
274 }
275 self.store.rerun_queries(dependencies, query_key, client);
276 client.push_result(operation.key, Ok(result))
277 }
278 }
279 }
280}
281
282#[async_trait]
283impl<TNext: Exchange> Exchange for NormalizedCacheImpl<TNext> {
284 async fn run<Q: GraphQLQuery, C: Client>(
285 &self,
286 operation: Operation<Q::Variables>,
287 client: C
288 ) -> ExchangeResult<Q::ResponseData> {
289 let extension = operation
290 .options
291 .extensions
292 .as_ref()
293 .and_then(|ext| ext.get::<NormalizedCacheExtension, _>("NormalizedCache"));
294 let extension = extension.as_ref();
295
296 if should_cache::<Q>(&operation) {
297 self.write_updater::<Q>(&operation);
298 let cached = self.store.read_query::<Q>(&operation, ptr::null_mut());
299 if let Some(cached) = cached {
300 let response = OperationResult {
301 key: operation.key,
302 response: Response {
303 debug_info: Some(DebugInfo {
304 did_dedup: false,
305 source: ResultSource::Cache
306 }),
307 data: Some(cached),
308 errors: None
309 },
310 meta: operation.meta
311 };
312 Ok(response)
313 } else {
314 self.run_optimistic_query::<Q, _>(&operation, &client, extension);
315 let variables: Q::Variables = operation.query.variables.clone();
316 let res = self.next.run::<Q, _>(operation, client.clone()).await?;
317 self.write_query::<Q, _>(&res, &variables, &client)?;
318 Ok(res)
319 }
320 } else {
321 let operation_type = operation.meta.operation_type.clone();
322
323 if operation_type == OperationType::Mutation {
324 self.run_optimistic_mutation::<Q, _>(&operation, &client, extension);
325 }
326
327 let variables = operation.query.variables.clone();
328 let res = self
329 .next
330 .run::<Q, _>(operation.clone(), client.clone())
331 .await?;
332 if operation_type == OperationType::Mutation {
333 self.invalidate_and_update::<Q, _>(&res, variables, &client, extension);
334 }
335 Ok(res)
336 }
337 }
338}