1use crate::{Mutation, MutationState, Query, QueryClient, QueryError, QueryState, RetryConfig};
4use gpui::Context;
5use std::future::Future;
6
7pub fn spawn_query<T, V>(
33 cx: &mut Context<V>,
34 client: &QueryClient,
35 query: &Query<T>,
36 on_complete: impl FnOnce(&mut V, QueryState<T>, &mut Context<V>) + 'static,
37) where
38 T: Clone + Send + Sync + std::fmt::Debug + 'static,
39 V: 'static,
40{
41 let key = query.key.clone();
42 let fetch_fn = query.fetch_fn.clone();
43 let options = query.options.clone();
44 let client = client.clone();
45
46 if client.is_in_flight(&key) {
48 tracing::trace!(
49 target: "rs_query",
50 query_key = %key.cache_key(),
51 "Query already in flight, skipping duplicate request"
52 );
53 return;
54 }
55
56 tracing::debug!(
57 target: "rs_query",
58 query_key = %key.cache_key(),
59 stale_time_ms = ?options.stale_time.as_millis(),
60 max_retries = options.retry.max_retries,
61 "Starting query"
62 );
63
64 client.set_in_flight(&key, true);
65
66 let retry_config = options.retry.clone();
67 let key_for_task = key.cache_key();
68
69 let task = cx.background_executor().spawn(async move {
70 let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
71 rt.block_on(async { execute_with_retry(&*fetch_fn, &retry_config, &key_for_task).await })
72 });
73
74 let key_for_cleanup = key.clone();
75 let client_for_cleanup = client.clone();
76
77 cx.spawn(async move |this, cx| {
78 let result = task.await;
79 client_for_cleanup.set_in_flight(&key_for_cleanup, false);
80
81 let state = match result {
82 Ok(data) => {
83 tracing::debug!(
84 target: "rs_query",
85 query_key = %key_for_cleanup.cache_key(),
86 "Query completed successfully"
87 );
88 client_for_cleanup.set_query_data(&key_for_cleanup, data.clone(), options);
89 QueryState::Success(data)
90 }
91 Err(e) => {
92 tracing::warn!(
93 target: "rs_query",
94 query_key = %key_for_cleanup.cache_key(),
95 error = %e,
96 "Query failed"
97 );
98 QueryState::Error {
99 error: e,
100 stale_data: client_for_cleanup.get_query_data(&key_for_cleanup),
101 }
102 }
103 };
104
105 let _ = this.update(cx, |this, cx| {
106 on_complete(this, state, cx);
107 });
108 })
109 .detach();
110}
111
112pub fn spawn_mutation<T, P, V>(
131 cx: &mut Context<V>,
132 client: &QueryClient,
133 mutation: &Mutation<T, P>,
134 params: P,
135 on_complete: impl FnOnce(&mut V, MutationState<T>, &mut Context<V>) + 'static,
136) where
137 T: Clone + Send + Sync + std::fmt::Debug + 'static,
138 P: Clone + Send + std::fmt::Debug + 'static,
139 V: 'static,
140{
141 let mutate_fn = mutation.mutate_fn.clone();
142 let invalidate_keys = mutation.invalidate_keys.clone();
143 let client = client.clone();
144
145 tracing::debug!(
146 target: "rs_query",
147 invalidate_keys = ?invalidate_keys.iter().map(|k| k.cache_key()).collect::<Vec<_>>(),
148 "Starting mutation"
149 );
150
151 let task = cx
152 .background_executor()
153 .spawn(async move {
154 let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
155 rt.block_on(async { (mutate_fn)(params).await })
156 });
157
158 cx.spawn(async move |this, cx| {
159 let result = task.await;
160
161 let state = match result {
162 Ok(data) => {
163 tracing::debug!(
164 target: "rs_query",
165 invalidated_count = invalidate_keys.len(),
166 "Mutation completed successfully"
167 );
168 for key in &invalidate_keys {
170 client.invalidate_queries(key);
171 }
172 MutationState::Success(data)
173 }
174 Err(e) => {
175 tracing::warn!(
176 target: "rs_query",
177 error = %e,
178 "Mutation failed"
179 );
180 MutationState::Error(e)
181 }
182 };
183
184 let _ = this.update(cx, |this, cx| {
185 on_complete(this, state, cx);
186 });
187 })
188 .detach();
189}
190
191async fn execute_with_retry<T, F>(
193 fetch_fn: &F,
194 retry_config: &RetryConfig,
195 query_key: &str,
196) -> Result<T, QueryError>
197where
198 F: Fn() -> std::pin::Pin<Box<dyn Future<Output = Result<T, QueryError>> + Send>> + ?Sized,
199{
200 let mut attempts = 0;
201 let mut last_error = None;
202
203 while attempts <= retry_config.max_retries {
204 match fetch_fn().await {
205 Ok(data) => {
206 if attempts > 0 {
207 tracing::debug!(
208 target: "rs_query",
209 query_key = %query_key,
210 attempts = attempts + 1,
211 "Query succeeded after retry"
212 );
213 }
214 return Ok(data);
215 }
216 Err(e) => {
217 if !e.is_retryable() || attempts >= retry_config.max_retries {
218 if attempts > 0 {
219 tracing::debug!(
220 target: "rs_query",
221 query_key = %query_key,
222 attempts = attempts + 1,
223 error = %e,
224 "Query failed after all retries"
225 );
226 }
227 return Err(e);
228 }
229
230 attempts += 1;
231 let delay = std::cmp::min(
232 retry_config.base_delay * 2u32.pow(attempts - 1),
233 retry_config.max_delay,
234 );
235
236 tracing::debug!(
237 target: "rs_query",
238 query_key = %query_key,
239 attempt = attempts,
240 max_retries = retry_config.max_retries,
241 delay_ms = delay.as_millis(),
242 error = %e,
243 "Retrying query after error"
244 );
245
246 last_error = Some(e);
247 tokio::time::sleep(delay).await;
248 }
249 }
250 }
251
252 Err(last_error.unwrap_or(QueryError::Custom("Max retries exceeded".into())))
253}