1use crate::{
2 as_rc, client::QueryOptions, AsKeys, DataSignal, Fetcher, QueryClient, QueryData, Status,
3};
4use fluvio_wasm_timer::Delay;
5use std::any::Any;
6use std::{future::Future, rc::Rc};
7use sycamore::{
8 futures::spawn_local,
9 reactive::{
10 create_effect, create_memo, create_rc_signal, create_ref, create_selector, use_context,
11 ReadSignal, Scope, Signal,
12 },
13};
14
15pub struct Query<'a, T, E, F: Fn()> {
35 pub data: &'a ReadSignal<QueryData<Rc<T>, Rc<E>>>,
37 pub status: Rc<Signal<Status>>,
39 pub refetch: &'a F,
42}
43
44impl QueryClient {
45 pub(crate) fn find_query(
46 &self,
47 key: &[u64],
48 new_hook: bool,
49 ) -> Option<(Rc<DataSignal>, Rc<Signal<Status>>, Fetcher)> {
50 let data = self.data_signals.read().unwrap().get(key);
51 let status = self.status_signals.read().unwrap().get(key);
52 let fetcher = self.fetchers.read().unwrap().get(key)?.clone();
53 let (data, status) = match (data, status) {
54 (None, None) => None,
55 (None, Some(status)) => {
56 let data = if let Some(data) = self.cache.read().unwrap().get(key) {
57 QueryData::Ok(data)
58 } else {
59 QueryData::Loading
60 };
61 let data = as_rc(create_rc_signal(data));
62 if new_hook {
63 self.data_signals
64 .write()
65 .unwrap()
66 .insert(key.to_vec(), data.clone());
67 }
68 Some((data, status))
69 }
70 (Some(data), None) => {
71 let status = as_rc(create_rc_signal(Status::Success));
72 if new_hook {
73 self.status_signals
74 .write()
75 .unwrap()
76 .insert(key.to_vec(), status.clone());
77 }
78 Some((data, status))
79 }
80 (Some(data), Some(status)) => Some((data, status)),
81 }?;
82 Some((data, status, fetcher))
83 }
84
85 pub(crate) fn insert_query(
86 &self,
87 key: Vec<u64>,
88 data: Rc<DataSignal>,
89 status: Rc<Signal<Status>>,
90 fetcher: Fetcher,
91 ) {
92 self.data_signals.write().unwrap().insert(key.clone(), data);
93 self.status_signals
94 .write()
95 .unwrap()
96 .insert(key.clone(), status);
97 self.fetchers.write().unwrap().insert(key, fetcher);
98 }
99
100 pub(crate) fn run_query(
101 self: Rc<Self>,
102 key: &[u64],
103 data: Rc<DataSignal>,
104 status: Rc<Signal<Status>>,
105 fetcher: Fetcher,
106 options: &QueryOptions,
107 ) {
108 let options = self.default_options.merge(options);
109 if let Some(cached) = {
110 let cache = self.cache.read().unwrap();
111 cache.get(key)
112 } {
113 data.set(QueryData::Ok(cached));
114 self.clone().invalidate_queries(vec![key.to_vec()]);
115 } else if *status.get_untracked() != Status::Fetching {
116 status.set(Status::Fetching);
117 let key = key.to_vec();
118 spawn_local(async move {
119 let mut res = fetcher().await;
120 let mut retries = 0;
121 while res.is_err() && retries < options.retries {
122 Delay::new((options.retry_fn)(retries)).await.unwrap();
123 res = fetcher().await;
124 retries += 1;
125 }
126 data.set(res.map_or_else(QueryData::Err, QueryData::Ok));
127 if let QueryData::Ok(data) = data.get_untracked().as_ref() {
128 self.cache
129 .write()
130 .unwrap()
131 .insert(key, data.clone(), &options);
132 }
133 status.set(Status::Success);
134 });
135 }
136 }
137
138 pub(crate) fn refetch_query(self: Rc<Self>, key: &[u64]) {
139 self.invalidate_queries(vec![key.to_vec()]);
140 }
141}
142
143pub fn use_query<'a, K, T, E, F, R>(
195 cx: Scope<'a>,
196 key: K,
197 fetcher: F,
198) -> Query<'a, T, E, impl Fn() + 'a>
199where
200 K: AsKeys + 'a,
201 F: Fn() -> R + 'static,
202 R: Future<Output = Result<T, E>> + 'static,
203 T: 'static,
204 E: 'static,
205{
206 use_query_with_options(cx, key, fetcher, QueryOptions::default())
207}
208
209pub fn use_query_with_options<'a, K, T, E, F, R>(
212 cx: Scope<'a>,
213 key: K,
214 fetcher: F,
215 options: QueryOptions,
216) -> Query<'a, T, E, impl Fn() + 'a>
217where
218 K: AsKeys + 'a,
219 F: Fn() -> R + 'static,
220 R: Future<Output = Result<T, E>> + 'static,
221 T: 'static,
222 E: 'static,
223{
224 let id = create_selector(cx, move || key.as_keys());
225
226 let client = use_context::<Rc<QueryClient>>(cx).clone();
227 let (data, status, fetcher) = if let Some(query) = client.find_query(&id.get(), true) {
228 query
229 } else {
230 let data: Rc<DataSignal> = as_rc(create_rc_signal(QueryData::Loading));
231 let status = as_rc(create_rc_signal(Status::Idle));
232 let fetcher: Fetcher = Rc::new(move || {
233 let fut = fetcher();
234 Box::pin(async move {
235 fut.await
236 .map(|data| -> Rc<dyn Any> { Rc::new(data) })
237 .map_err(|err| -> Rc<dyn Any> { Rc::new(err) })
238 })
239 });
240 client.insert_query(
241 id.get().as_ref().clone(),
242 data.clone(),
243 status.clone(),
244 fetcher.clone(),
245 );
246 (data, status, fetcher)
247 };
248
249 {
250 let client = client.clone();
251 let data = data.clone();
252 let status = status.clone();
253 create_effect(cx, move || {
254 log::info!("Key changed. New key: {:?}", id.get());
255 client.clone().run_query(
256 &id.get(),
257 data.clone(),
258 status.clone(),
259 fetcher.clone(),
260 &options,
261 );
262 });
263 }
264
265 let refetch = create_ref(cx, move || {
266 client.clone().refetch_query(&id.get());
267 });
268 let data = create_memo(cx, move || match data.get().as_ref() {
269 QueryData::Loading => QueryData::Loading,
270 QueryData::Ok(data) => QueryData::Ok(data.clone().downcast().unwrap()),
271 QueryData::Err(err) => QueryData::Err(err.clone().downcast().unwrap()),
272 });
273
274 Query {
275 data,
276 status,
277 refetch,
278 }
279}