dioxus_core/
tasks.rs

1use crate::innerlude::Effect;
2use crate::innerlude::ScopeOrder;
3use crate::innerlude::{remove_future, spawn, Runtime};
4use crate::scope_context::ScopeStatus;
5use crate::scope_context::SuspenseLocation;
6use crate::ScopeId;
7use futures_util::task::ArcWake;
8use slotmap::DefaultKey;
9use std::marker::PhantomData;
10use std::panic;
11use std::sync::Arc;
12use std::task::Waker;
13use std::{cell::Cell, future::Future};
14use std::{cell::RefCell, rc::Rc};
15use std::{pin::Pin, task::Poll};
16
17/// A task's unique identifier.
18///
19/// `Task` is a unique identifier for a task that has been spawned onto the runtime. It can be used to cancel the task
20#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
21#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
22pub struct Task {
23    pub(crate) id: slotmap::DefaultKey,
24    // We add a raw pointer to make this !Send + !Sync
25    unsend: PhantomData<*const ()>,
26}
27
28impl Task {
29    /// Create a task from a raw id
30    pub(crate) const fn from_id(id: slotmap::DefaultKey) -> Self {
31        Self {
32            id,
33            unsend: PhantomData,
34        }
35    }
36
37    /// Start a new future on the same thread as the rest of the VirtualDom.
38    ///
39    /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
40    /// and long running tasks.
41    ///
42    /// Whenever the component that owns this future is dropped, the future will be dropped as well.
43    ///
44    /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
45    /// will only occur when the VirtualDom itself has been dropped.
46    pub fn new(task: impl Future<Output = ()> + 'static) -> Self {
47        spawn(task)
48    }
49
50    /// Drop the task immediately.
51    ///
52    /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
53    pub fn cancel(self) {
54        remove_future(self);
55    }
56
57    /// Pause the task.
58    pub fn pause(&self) {
59        self.set_active(false);
60    }
61
62    /// Resume the task.
63    pub fn resume(&self) {
64        self.set_active(true);
65    }
66
67    /// Check if the task is paused.
68    pub fn paused(&self) -> bool {
69        Runtime::with(|rt| {
70            if let Some(task) = rt.tasks.borrow().get(self.id) {
71                !task.active.get()
72            } else {
73                false
74            }
75        })
76        .unwrap_or_default()
77    }
78
79    /// Wake the task.
80    #[track_caller]
81    pub fn wake(&self) {
82        Runtime::with(|rt| {
83            _ = rt
84                .sender
85                .unbounded_send(SchedulerMsg::TaskNotified(self.id))
86        })
87        .unwrap_or_else(|e| panic!("{}", e))
88    }
89
90    /// Poll the task immediately.
91    #[track_caller]
92    pub fn poll_now(&self) -> Poll<()> {
93        Runtime::with(|rt| rt.handle_task_wakeup(*self)).unwrap_or_else(|e| panic!("{}", e))
94    }
95
96    /// Set the task as active or paused.
97    #[track_caller]
98    pub fn set_active(&self, active: bool) {
99        Runtime::with(|rt| {
100            if let Some(task) = rt.tasks.borrow().get(self.id) {
101                let was_active = task.active.replace(active);
102                if !was_active && active {
103                    _ = rt
104                        .sender
105                        .unbounded_send(SchedulerMsg::TaskNotified(self.id));
106                }
107            }
108        })
109        .unwrap_or_else(|e| panic!("{}", e))
110    }
111}
112
113impl Runtime {
114    /// Start a new future on the same thread as the rest of the VirtualDom.
115    ///
116    /// **You should generally use `spawn` instead of this method unless you specifically need to need to run a task during suspense**
117    ///
118    /// This future will not contribute to suspense resolving but it will run during suspense.
119    ///
120    /// Because this future runs during suspense, you need to be careful to work with hydration. It is not recommended to do any async IO work in this future, as it can easily cause hydration issues. However, you can use isomorphic tasks to do work that can be consistently replicated on the server and client like logging or responding to state changes.
121    ///
122    /// ```rust, no_run
123    /// # use dioxus::prelude::*;
124    /// // ❌ Do not do requests in isomorphic tasks. It may resolve at a different time on the server and client, causing hydration issues.
125    /// let mut state = use_signal(|| None);
126    /// spawn_isomorphic(async move {
127    ///     state.set(Some(reqwest::get("https://api.example.com").await));
128    /// });
129    ///
130    /// // ✅ You may wait for a signal to change and then log it
131    /// let mut state = use_signal(|| 0);
132    /// spawn_isomorphic(async move {
133    ///     loop {
134    ///         tokio::time::sleep(std::time::Duration::from_secs(1)).await;
135    ///         println!("State is {state}");
136    ///     }
137    /// });
138    /// ```
139    pub fn spawn_isomorphic(
140        &self,
141        scope: ScopeId,
142        task: impl Future<Output = ()> + 'static,
143    ) -> Task {
144        self.spawn_task_of_type(scope, task, TaskType::Isomorphic)
145    }
146
147    /// Start a new future on the same thread as the rest of the VirtualDom.
148    ///
149    /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
150    /// and long running tasks.
151    ///
152    /// Whenever the component that owns this future is dropped, the future will be dropped as well.
153    ///
154    /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
155    /// will only occur when the VirtualDom itself has been dropped.
156    pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> Task {
157        self.spawn_task_of_type(scope, task, TaskType::ClientOnly)
158    }
159
160    fn spawn_task_of_type(
161        &self,
162        scope: ScopeId,
163        task: impl Future<Output = ()> + 'static,
164        ty: TaskType,
165    ) -> Task {
166        self.spawn_task_of_type_inner(scope, Box::pin(task), ty)
167    }
168
169    // a non-momorphic version of spawn_task_of_type, helps with binari sizes
170    fn spawn_task_of_type_inner(
171        &self,
172        scope: ScopeId,
173        pinned_task: Pin<Box<dyn Future<Output = ()>>>,
174        ty: TaskType,
175    ) -> Task {
176        // Insert the task, temporarily holding a borrow on the tasks map
177        let (task, task_id) = {
178            let mut tasks = self.tasks.borrow_mut();
179
180            let mut task_id = Task::from_id(DefaultKey::default());
181            let mut local_task = None;
182            tasks.insert_with_key(|key| {
183                task_id = Task::from_id(key);
184
185                let new_task = Rc::new(LocalTask {
186                    scope,
187                    active: Cell::new(true),
188                    parent: self.current_task(),
189                    task: RefCell::new(pinned_task),
190                    waker: futures_util::task::waker(Arc::new(LocalTaskHandle {
191                        id: task_id.id,
192                        tx: self.sender.clone(),
193                    })),
194                    ty: RefCell::new(ty),
195                });
196
197                local_task = Some(new_task.clone());
198
199                new_task
200            });
201
202            (local_task.unwrap(), task_id)
203        };
204
205        // Get a borrow on the task, holding no borrows on the tasks map
206        debug_assert!(self.tasks.try_borrow_mut().is_ok());
207        debug_assert!(task.task.try_borrow_mut().is_ok());
208
209        self.sender
210            .unbounded_send(SchedulerMsg::TaskNotified(task_id.id))
211            .expect("Scheduler should exist");
212
213        task_id
214    }
215
216    /// Queue an effect to run after the next render
217    pub(crate) fn queue_effect(&self, id: ScopeId, f: impl FnOnce() + 'static) {
218        let effect = Box::new(f) as Box<dyn FnOnce() + 'static>;
219        let Some(scope) = self.get_state(id) else {
220            return;
221        };
222        let mut status = scope.status.borrow_mut();
223        match &mut *status {
224            ScopeStatus::Mounted => {
225                self.queue_effect_on_mounted_scope(id, effect);
226            }
227            ScopeStatus::Unmounted { effects_queued, .. } => {
228                effects_queued.push(effect);
229            }
230        }
231    }
232
233    /// Queue an effect to run after the next render without checking if the scope is mounted
234    pub(crate) fn queue_effect_on_mounted_scope(
235        &self,
236        id: ScopeId,
237        f: Box<dyn FnOnce() + 'static>,
238    ) {
239        // Add the effect to the queue of effects to run after the next render for the given scope
240        let mut effects = self.pending_effects.borrow_mut();
241        let scope_order = ScopeOrder::new(id.height(), id);
242        match effects.get(&scope_order) {
243            Some(effects) => effects.push_back(f),
244            None => {
245                effects.insert(Effect::new(scope_order, f));
246            }
247        }
248    }
249
250    /// Get the currently running task
251    pub fn current_task(&self) -> Option<Task> {
252        self.current_task.get()
253    }
254
255    /// Get the parent task of the given task, if it exists
256    pub fn parent_task(&self, task: Task) -> Option<Task> {
257        self.tasks.borrow().get(task.id)?.parent
258    }
259
260    pub(crate) fn task_scope(&self, task: Task) -> Option<ScopeId> {
261        self.tasks.borrow().get(task.id).map(|t| t.scope)
262    }
263
264    #[track_caller]
265    pub(crate) fn handle_task_wakeup(&self, id: Task) -> Poll<()> {
266        #[cfg(debug_assertions)]
267        {
268            // Ensure we are currently inside a `Runtime`.
269            Runtime::current().unwrap_or_else(|e| panic!("{}", e));
270        }
271
272        let task = self.tasks.borrow().get(id.id).cloned();
273
274        // The task was removed from the scheduler, so we can just ignore it
275        let Some(task) = task else {
276            return Poll::Ready(());
277        };
278
279        // If a task woke up but is paused, we can just ignore it
280        if !task.active.get() {
281            return Poll::Pending;
282        }
283
284        let mut cx = std::task::Context::from_waker(&task.waker);
285
286        // poll the future with the scope on the stack
287        let poll_result = self.with_scope_on_stack(task.scope, || {
288            self.current_task.set(Some(id));
289
290            let poll_result = task.task.borrow_mut().as_mut().poll(&mut cx);
291
292            if poll_result.is_ready() {
293                // Remove it from the scope so we dont try to double drop it when the scope dropes
294                self.get_state(task.scope)
295                    .unwrap()
296                    .spawned_tasks
297                    .borrow_mut()
298                    .remove(&id);
299
300                self.remove_task(id);
301            }
302
303            poll_result
304        });
305        self.current_task.set(None);
306
307        poll_result
308    }
309
310    /// Drop the future with the given Task
311    ///
312    /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
313    pub(crate) fn remove_task(&self, id: Task) -> Option<Rc<LocalTask>> {
314        // Remove the task from the task list
315        let task = self.tasks.borrow_mut().remove(id.id);
316
317        if let Some(task) = &task {
318            // Remove the task from suspense
319            if let TaskType::Suspended { boundary } = &*task.ty.borrow() {
320                self.suspended_tasks.set(self.suspended_tasks.get() - 1);
321                if let SuspenseLocation::UnderSuspense(boundary) = boundary {
322                    boundary.remove_suspended_task(id);
323                }
324            }
325
326            // Remove the task from pending work. We could reuse the slot before the task is polled and discarded so we need to remove it from pending work instead of filtering out dead tasks when we try to poll them
327            if let Some(scope) = self.get_state(task.scope) {
328                let order = ScopeOrder::new(scope.height(), scope.id);
329                if let Some(dirty_tasks) = self.dirty_tasks.borrow_mut().get(&order) {
330                    dirty_tasks.remove(id);
331                }
332            }
333        }
334
335        task
336    }
337
338    /// Check if a task should be run during suspense
339    pub(crate) fn task_runs_during_suspense(&self, task: Task) -> bool {
340        let borrow = self.tasks.borrow();
341        let task: Option<&LocalTask> = borrow.get(task.id).map(|t| &**t);
342        matches!(task, Some(LocalTask { ty, .. }) if ty.borrow().runs_during_suspense())
343    }
344}
345
346/// the task itself is the waker
347pub(crate) struct LocalTask {
348    scope: ScopeId,
349    parent: Option<Task>,
350    task: RefCell<Pin<Box<dyn Future<Output = ()> + 'static>>>,
351    waker: Waker,
352    ty: RefCell<TaskType>,
353    active: Cell<bool>,
354}
355
356impl LocalTask {
357    /// Suspend the task, returns true if the task was already suspended
358    pub(crate) fn suspend(&self, boundary: SuspenseLocation) -> bool {
359        // Make this a suspended task so it runs during suspense
360        let old_type = self.ty.replace(TaskType::Suspended { boundary });
361        matches!(old_type, TaskType::Suspended { .. })
362    }
363}
364
365#[derive(Clone)]
366enum TaskType {
367    ClientOnly,
368    Suspended { boundary: SuspenseLocation },
369    Isomorphic,
370}
371
372impl TaskType {
373    fn runs_during_suspense(&self) -> bool {
374        matches!(self, TaskType::Isomorphic | TaskType::Suspended { .. })
375    }
376}
377
378/// The type of message that can be sent to the scheduler.
379///
380/// These messages control how the scheduler will process updates to the UI.
381#[derive(Debug)]
382pub(crate) enum SchedulerMsg {
383    /// All components have been marked as dirty, requiring a full render
384    AllDirty,
385
386    /// Immediate updates from Components that mark them as dirty
387    Immediate(ScopeId),
388
389    /// A task has woken and needs to be progressed
390    TaskNotified(slotmap::DefaultKey),
391
392    /// An effect has been queued to run after the next render
393    EffectQueued,
394}
395
396struct LocalTaskHandle {
397    id: slotmap::DefaultKey,
398    tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
399}
400
401impl ArcWake for LocalTaskHandle {
402    fn wake_by_ref(arc_self: &Arc<Self>) {
403        _ = arc_self
404            .tx
405            .unbounded_send(SchedulerMsg::TaskNotified(arc_self.id));
406    }
407}