rs_query/
executor.rs

1//! GPUI async execution helpers
2
3use crate::{Mutation, MutationState, Query, QueryClient, QueryError, QueryState, RetryConfig};
4use gpui::Context;
5use std::future::Future;
6
7/// Execute a query using GPUI's executor.
8///
9/// This is the main entry point for running queries. It:
10/// - Deduplicates in-flight requests
11/// - Handles retries with exponential backoff
12/// - Caches results automatically
13/// - Calls your callback with the result
14///
15/// # Example
16///
17/// ```rust,ignore
18/// spawn_query(cx, &self.query_client, &users_query, |this, state, cx| {
19///     match state {
20///         QueryState::Success(users) => {
21///             this.users = users;
22///         }
23///         QueryState::Error { error, stale_data } => {
24///             this.error = Some(error.to_string());
25///             // Can still show stale_data if available
26///         }
27///         _ => {}
28///     }
29///     cx.notify();
30/// });
31/// ```
32pub fn spawn_query<T, V>(
33    cx: &mut Context<V>,
34    client: &QueryClient,
35    query: &Query<T>,
36    on_complete: impl FnOnce(&mut V, QueryState<T>, &mut Context<V>) + 'static,
37) where
38    T: Clone + Send + Sync + std::fmt::Debug + 'static,
39    V: 'static,
40{
41    let key = query.key.clone();
42    let fetch_fn = query.fetch_fn.clone();
43    let options = query.options.clone();
44    let client = client.clone();
45
46    // Check deduplication
47    if client.is_in_flight(&key) {
48        tracing::trace!(
49            target: "rs_query",
50            query_key = %key.cache_key(),
51            "Query already in flight, skipping duplicate request"
52        );
53        return;
54    }
55
56    tracing::debug!(
57        target: "rs_query",
58        query_key = %key.cache_key(),
59        stale_time_ms = ?options.stale_time.as_millis(),
60        max_retries = options.retry.max_retries,
61        "Starting query"
62    );
63
64    client.set_in_flight(&key, true);
65
66    let retry_config = options.retry.clone();
67    let key_for_task = key.cache_key();
68
69    let task = cx.background_executor().spawn(async move {
70        let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
71        rt.block_on(async { execute_with_retry(&*fetch_fn, &retry_config, &key_for_task).await })
72    });
73
74    let key_for_cleanup = key.clone();
75    let client_for_cleanup = client.clone();
76
77    cx.spawn(async move |this, cx| {
78        let result = task.await;
79        client_for_cleanup.set_in_flight(&key_for_cleanup, false);
80
81        let state = match result {
82            Ok(data) => {
83                tracing::debug!(
84                    target: "rs_query",
85                    query_key = %key_for_cleanup.cache_key(),
86                    "Query completed successfully"
87                );
88                client_for_cleanup.set_query_data(&key_for_cleanup, data.clone(), options);
89                QueryState::Success(data)
90            }
91            Err(e) => {
92                tracing::warn!(
93                    target: "rs_query",
94                    query_key = %key_for_cleanup.cache_key(),
95                    error = %e,
96                    "Query failed"
97                );
98                QueryState::Error {
99                    error: e,
100                    stale_data: client_for_cleanup.get_query_data(&key_for_cleanup),
101                }
102            }
103        };
104
105        let _ = this.update(cx, |this, cx| {
106            on_complete(this, state, cx);
107        });
108    })
109    .detach();
110}
111
112/// Execute a mutation with automatic cache invalidation.
113///
114/// # Example
115///
116/// ```rust,ignore
117/// spawn_mutation(cx, &self.query_client, &create_user_mutation, params, |this, state, cx| {
118///     match state {
119///         MutationState::Success(user) => {
120///             this.show_success("User created!");
121///         }
122///         MutationState::Error(e) => {
123///             this.show_error(e.to_string());
124///         }
125///         _ => {}
126///     }
127///     cx.notify();
128/// });
129/// ```
130pub fn spawn_mutation<T, P, V>(
131    cx: &mut Context<V>,
132    client: &QueryClient,
133    mutation: &Mutation<T, P>,
134    params: P,
135    on_complete: impl FnOnce(&mut V, MutationState<T>, &mut Context<V>) + 'static,
136) where
137    T: Clone + Send + Sync + std::fmt::Debug + 'static,
138    P: Clone + Send + std::fmt::Debug + 'static,
139    V: 'static,
140{
141    let mutate_fn = mutation.mutate_fn.clone();
142    let invalidate_keys = mutation.invalidate_keys.clone();
143    let client = client.clone();
144
145    tracing::debug!(
146        target: "rs_query",
147        invalidate_keys = ?invalidate_keys.iter().map(|k| k.cache_key()).collect::<Vec<_>>(),
148        "Starting mutation"
149    );
150
151    let task = cx
152        .background_executor()
153        .spawn(async move {
154            let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
155            rt.block_on(async { (mutate_fn)(params).await })
156        });
157
158    cx.spawn(async move |this, cx| {
159        let result = task.await;
160
161        let state = match result {
162            Ok(data) => {
163                tracing::debug!(
164                    target: "rs_query",
165                    invalidated_count = invalidate_keys.len(),
166                    "Mutation completed successfully"
167                );
168                // Invalidate queries
169                for key in &invalidate_keys {
170                    client.invalidate_queries(key);
171                }
172                MutationState::Success(data)
173            }
174            Err(e) => {
175                tracing::warn!(
176                    target: "rs_query",
177                    error = %e,
178                    "Mutation failed"
179                );
180                MutationState::Error(e)
181            }
182        };
183
184        let _ = this.update(cx, |this, cx| {
185            on_complete(this, state, cx);
186        });
187    })
188    .detach();
189}
190
191/// Execute with retry logic
192async fn execute_with_retry<T, F>(
193    fetch_fn: &F,
194    retry_config: &RetryConfig,
195    query_key: &str,
196) -> Result<T, QueryError>
197where
198    F: Fn() -> std::pin::Pin<Box<dyn Future<Output = Result<T, QueryError>> + Send>> + ?Sized,
199{
200    let mut attempts = 0;
201    let mut last_error = None;
202
203    while attempts <= retry_config.max_retries {
204        match fetch_fn().await {
205            Ok(data) => {
206                if attempts > 0 {
207                    tracing::debug!(
208                        target: "rs_query",
209                        query_key = %query_key,
210                        attempts = attempts + 1,
211                        "Query succeeded after retry"
212                    );
213                }
214                return Ok(data);
215            }
216            Err(e) => {
217                if !e.is_retryable() || attempts >= retry_config.max_retries {
218                    if attempts > 0 {
219                        tracing::debug!(
220                            target: "rs_query",
221                            query_key = %query_key,
222                            attempts = attempts + 1,
223                            error = %e,
224                            "Query failed after all retries"
225                        );
226                    }
227                    return Err(e);
228                }
229
230                attempts += 1;
231                let delay = std::cmp::min(
232                    retry_config.base_delay * 2u32.pow(attempts - 1),
233                    retry_config.max_delay,
234                );
235
236                tracing::debug!(
237                    target: "rs_query",
238                    query_key = %query_key,
239                    attempt = attempts,
240                    max_retries = retry_config.max_retries,
241                    delay_ms = delay.as_millis(),
242                    error = %e,
243                    "Retrying query after error"
244                );
245
246                last_error = Some(e);
247                tokio::time::sleep(delay).await;
248            }
249        }
250    }
251
252    Err(last_error.unwrap_or(QueryError::Custom("Max retries exceeded".into())))
253}