artemis_normalized_cache/cache_exchange/
mod.rs

1//! Contains the exchange factory and implementation. The factory is the only thing needed for most
2//! users and is reexported from the root.
3
4use crate::{
5    store::Store,
6    types::{NormalizedCacheExtension, NormalizedCacheOptions},
7    HashSet
8};
9use artemis::{
10    exchange::{
11        Client, Exchange, ExchangeFactory, ExchangeResult, Operation, OperationResult,
12        OperationType
13    },
14    DebugInfo, GraphQLQuery, QueryError, RequestPolicy, Response, ResultSource
15};
16#[cfg(target_arch = "wasm32")]
17use parking_lot::RwLock;
18#[cfg(target_arch = "wasm32")]
19use serde::de::DeserializeOwned;
20use std::{collections::HashMap, ptr, sync::Arc};
21#[cfg(target_arch = "wasm32")]
22use wasm_bindgen::JsValue;
23
24#[cfg(test)]
25mod tests;
26
27/// The normalized cache exchange. This will store normalized queries by unique ID.
28#[derive(Default)]
29pub struct NormalizedCacheExchange {
30    options: Option<NormalizedCacheOptions>
31}
32
33impl NormalizedCacheExchange {
34    /// Create a new cache exchange with extra options.
35    #[allow(unused)]
36    pub fn with_options(options: NormalizedCacheOptions) -> Self {
37        Self {
38            options: Some(options)
39        }
40    }
41
42    /// Create a new cache exchange with default options
43    #[allow(unused)]
44    pub fn new() -> Self {
45        Self { options: None }
46    }
47}
48
49impl<TNext: Exchange> ExchangeFactory<TNext> for NormalizedCacheExchange {
50    type Output = NormalizedCacheImpl<TNext>;
51
52    fn build(self, next: TNext) -> NormalizedCacheImpl<TNext> {
53        let options = self.options.unwrap_or_else(NormalizedCacheOptions::default);
54        let store = Store::new(options.custom_keys.unwrap_or_else(HashMap::default));
55        NormalizedCacheImpl {
56            next,
57            store: Arc::new(store),
58            #[cfg(target_arch = "wasm32")]
59            updaters: HashMap::new()
60        }
61    }
62}
63
64/// The implementation of the normalized cache. Exposed in case someone needs it, but most users
65/// shouldn't.
66pub struct NormalizedCacheImpl<TNext: Exchange> {
67    next: TNext,
68    store: Arc<Store>,
69    #[cfg(target_arch = "wasm32")]
70    updaters: Arc<RwLock<HashMap<u64, Box<dyn Fn(JsValue, js_sys::Function, *mut usize)>>>>
71}
72
73fn should_cache<Q: GraphQLQuery>(operation: &Operation<Q::Variables>) -> bool {
74    operation.meta.operation_type == OperationType::Query
75        && operation.options.request_policy != RequestPolicy::NetworkOnly
76}
77
78fn is_optimistic_mutation<Q: GraphQLQuery>(op: &Operation<Q::Variables>) -> bool {
79    op.meta.operation_type == OperationType::Mutation
80        && op.options.request_policy != RequestPolicy::NetworkOnly
81}
82
83impl<TNext: Exchange> NormalizedCacheImpl<TNext> {
84    #[cfg(target_arch = "wasm32")]
85    fn write_updater<Q: GraphQLQuery>(&self, operation: &Operation<Q::Variables>)
86    where
87        Q::Variables: DeserializeOwned
88    {
89        if self.updaters.read().contains_key(&operation.key) {
90            let updater = {
91                let store = self.store.clone();
92                move |variables: Value, updater_fn: js_sys::Function, dependencies: *mut usize| {
93                    store.update_query_js::<Q>(variables, updater_fn, dependencies);
94                }
95            };
96            self.updaters
97                .write()
98                .insert(operation.key, Box::new(updater));
99        }
100    }
101
102    #[cfg(not(target_arch = "wasm32"))]
103    fn write_updater<Q: GraphQLQuery>(&self, _operation: &Operation<Q::Variables>) {}
104
105    fn write_query<Q: GraphQLQuery, C: Client>(
106        &self,
107        result: &OperationResult<Q::ResponseData>,
108        variables: &Q::Variables,
109        client: &C
110    ) -> Result<(), QueryError> {
111        let query_key = result.key;
112        let mut dependencies = HashSet::default();
113        if result.response.errors.is_some() {
114            self.store.clear_optimistic_layer(query_key);
115        } else {
116            self.store
117                .write_query::<Q>(result, variables, false, &mut dependencies)?;
118            self.store.rerun_queries(dependencies, query_key, client);
119            self.store.clear_optimistic_layer(query_key);
120        }
121        Ok(())
122    }
123
124    fn invalidate_and_update<Q: GraphQLQuery, C: Client>(
125        &self,
126        result: &OperationResult<Q::ResponseData>,
127        variables: Q::Variables,
128        client: &C,
129        extension: Option<&NormalizedCacheExtension>
130    ) {
131        let query_key = result.key;
132        let mut dependencies = HashSet::default();
133        self.store
134            .write_query::<Q>(result, &variables, false, &mut dependencies)
135            .unwrap();
136        self.store
137            .invalidate_query::<Q>(result, &variables, false, &mut dependencies);
138        if let Some(updater) = extension.and_then(|ext| ext.update.as_ref()) {
139            updater(
140                &result.response.data,
141                self.store.clone().into(),
142                &mut dependencies
143            );
144        } else {
145            self.update_js::<Q>(extension, result.response.data.as_ref(), &mut dependencies);
146        }
147        println!("Invalidated dependencies {:?}", dependencies);
148        self.store.rerun_queries(dependencies, query_key, client);
149    }
150
151    #[cfg(target_arch = "wasm32")]
152    fn update_js<Q: GraphQLQuery>(
153        &self,
154        extension: Option<&NormalizedCacheExtension>,
155        data: Option<&Q::ResponseData>,
156        dependencies: &mut Vec<String>
157    ) {
158        if let Some(updater) = extension.and_then(|ext| ext.update_js) {
159            let this = JsValue::NULL;
160            let data = serde_wasm_bindgen::to_value(data).unwrap();
161            let dependencies = dependencies as *mut _ as *mut usize;
162            updater.call3(
163                &this,
164                &data,
165                &|query: u64,
166                  variables: JsValue,
167                  updater: js_sys::Function,
168                  dependencies: *mut usize| {
169                    let updaters = self.updaters.read();
170                    let updater_fn = updaters.get(&query);
171                    if let Some(updater_fn) = updater_fn {
172                        updater_fn(variables, updater, dependencies);
173                    }
174                },
175                dependencies
176            );
177        }
178    }
179
180    #[cfg(not(target_arch = "wasm32"))]
181    fn update_js<Q: GraphQLQuery>(
182        &self,
183        _extension: Option<&NormalizedCacheExtension>,
184        _data: Option<&Q::ResponseData>,
185        _dependencies: &mut HashSet<String>
186    ) {
187    }
188
189    fn run_optimistic_query<Q: GraphQLQuery, C: Client>(
190        &self,
191        operation: &Operation<Q::Variables>,
192        client: &C,
193        extension: Option<&NormalizedCacheExtension>
194    ) {
195        if operation.options.request_policy != RequestPolicy::NetworkOnly {
196            let variables = &operation.query.variables;
197            let data = extension
198                .and_then(|ext| ext.optimistic_result.as_ref())
199                .and_then(|resolver| resolver())
200                .and_then(|result| result.downcast::<Q::ResponseData>().ok())
201                .map(|result| *result);
202
203            if let Some(data) = data {
204                let result = OperationResult {
205                    key: operation.key,
206                    meta: operation.meta.clone(),
207                    response: Response {
208                        data: Some(data),
209                        debug_info: None,
210                        errors: None
211                    }
212                };
213
214                let query_key = operation.key;
215
216                let mut dependencies = HashSet::default();
217                self.store
218                    .write_query::<Q>(&result, variables, true, &mut dependencies)
219                    .unwrap();
220
221                println!("Optimistic dependencies: {:?}", dependencies);
222                self.store.rerun_queries(dependencies, query_key, client);
223                client.push_result(operation.key, Ok(result))
224            }
225        }
226    }
227
228    fn run_optimistic_mutation<Q: GraphQLQuery, C: Client>(
229        &self,
230        operation: &Operation<Q::Variables>,
231        client: &C,
232        extension: Option<&NormalizedCacheExtension>
233    ) {
234        if is_optimistic_mutation::<Q>(operation) {
235            let variables = &operation.query.variables;
236            let data = extension
237                .and_then(|extension: &NormalizedCacheExtension| {
238                    extension.optimistic_result.as_ref()
239                })
240                .and_then(|resolver| resolver())
241                .and_then(|result| result.downcast::<Q::ResponseData>().ok())
242                .map(|result| *result);
243
244            if let Some(data) = data {
245                let result = OperationResult {
246                    key: operation.key,
247                    meta: operation.meta.clone(),
248                    response: Response {
249                        data: Some(data),
250                        debug_info: None,
251                        errors: None
252                    }
253                };
254
255                let query_key = operation.key;
256                let mut dependencies = HashSet::default();
257                self.store
258                    .invalidate_query::<Q>(&result, variables, true, &mut dependencies);
259                self.store
260                    .write_query::<Q>(&result, variables, true, &mut dependencies)
261                    .unwrap();
262                if let Some(updater) = extension.and_then(|ext| ext.update.as_ref()) {
263                    updater(
264                        &result.response.data,
265                        self.store.clone().into(),
266                        &mut dependencies
267                    );
268                } else {
269                    self.update_js::<Q>(
270                        extension,
271                        result.response.data.as_ref(),
272                        &mut dependencies
273                    );
274                }
275                self.store.rerun_queries(dependencies, query_key, client);
276                client.push_result(operation.key, Ok(result))
277            }
278        }
279    }
280}
281
282#[async_trait]
283impl<TNext: Exchange> Exchange for NormalizedCacheImpl<TNext> {
284    async fn run<Q: GraphQLQuery, C: Client>(
285        &self,
286        operation: Operation<Q::Variables>,
287        client: C
288    ) -> ExchangeResult<Q::ResponseData> {
289        let extension = operation
290            .options
291            .extensions
292            .as_ref()
293            .and_then(|ext| ext.get::<NormalizedCacheExtension, _>("NormalizedCache"));
294        let extension = extension.as_ref();
295
296        if should_cache::<Q>(&operation) {
297            self.write_updater::<Q>(&operation);
298            let cached = self.store.read_query::<Q>(&operation, ptr::null_mut());
299            if let Some(cached) = cached {
300                let response = OperationResult {
301                    key: operation.key,
302                    response: Response {
303                        debug_info: Some(DebugInfo {
304                            did_dedup: false,
305                            source: ResultSource::Cache
306                        }),
307                        data: Some(cached),
308                        errors: None
309                    },
310                    meta: operation.meta
311                };
312                Ok(response)
313            } else {
314                self.run_optimistic_query::<Q, _>(&operation, &client, extension);
315                let variables: Q::Variables = operation.query.variables.clone();
316                let res = self.next.run::<Q, _>(operation, client.clone()).await?;
317                self.write_query::<Q, _>(&res, &variables, &client)?;
318                Ok(res)
319            }
320        } else {
321            let operation_type = operation.meta.operation_type.clone();
322
323            if operation_type == OperationType::Mutation {
324                self.run_optimistic_mutation::<Q, _>(&operation, &client, extension);
325            }
326
327            let variables = operation.query.variables.clone();
328            let res = self
329                .next
330                .run::<Q, _>(operation.clone(), client.clone())
331                .await?;
332            if operation_type == OperationType::Mutation {
333                self.invalidate_and_update::<Q, _>(&res, variables, &client, extension);
334            }
335            Ok(res)
336        }
337    }
338}