dioxus_core/
tasks.rs

1use crate::innerlude::Effect;
2use crate::innerlude::ScopeOrder;
3use crate::innerlude::{remove_future, spawn, Runtime};
4use crate::scope_context::ScopeStatus;
5use crate::scope_context::SuspenseLocation;
6use crate::ScopeId;
7use futures_util::task::ArcWake;
8use slotmap::DefaultKey;
9use std::marker::PhantomData;
10use std::panic;
11use std::sync::Arc;
12use std::task::Waker;
13use std::{cell::Cell, future::Future};
14use std::{cell::RefCell, rc::Rc};
15use std::{pin::Pin, task::Poll};
16
17/// A task's unique identifier.
18///
19/// `Task` is a unique identifier for a task that has been spawned onto the runtime. It can be used to cancel the task
20#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
21#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
22pub struct Task {
23    pub(crate) id: slotmap::DefaultKey,
24    // We add a raw pointer to make this !Send + !Sync
25    unsend: PhantomData<*const ()>,
26}
27
28impl Task {
29    /// Create a task from a raw id
30    pub(crate) const fn from_id(id: slotmap::DefaultKey) -> Self {
31        Self {
32            id,
33            unsend: PhantomData,
34        }
35    }
36
37    /// Start a new future on the same thread as the rest of the VirtualDom.
38    ///
39    /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
40    /// and long running tasks.
41    ///
42    /// Whenever the component that owns this future is dropped, the future will be dropped as well.
43    ///
44    /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
45    /// will only occur when the VirtualDom itself has been dropped.
46    pub fn new(task: impl Future<Output = ()> + 'static) -> Self {
47        spawn(task)
48    }
49
50    /// Drop the task immediately.
51    ///
52    /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
53    pub fn cancel(self) {
54        remove_future(self);
55    }
56
57    /// Pause the task.
58    pub fn pause(&self) {
59        self.set_active(false);
60    }
61
62    /// Resume the task.
63    pub fn resume(&self) {
64        self.set_active(true);
65    }
66
67    /// Check if the task is paused.
68    pub fn paused(&self) -> bool {
69        Runtime::with(|rt| {
70            if let Some(task) = rt.tasks.borrow().get(self.id) {
71                !task.active.get()
72            } else {
73                false
74            }
75        })
76        .unwrap_or_default()
77    }
78
79    /// Wake the task.
80    #[track_caller]
81    pub fn wake(&self) {
82        Runtime::with(|rt| {
83            _ = rt
84                .sender
85                .unbounded_send(SchedulerMsg::TaskNotified(self.id))
86        })
87        .unwrap_or_else(|e| panic!("{}", e))
88    }
89
90    /// Poll the task immediately.
91    #[track_caller]
92    pub fn poll_now(&self) -> Poll<()> {
93        Runtime::with(|rt| rt.handle_task_wakeup(*self)).unwrap_or_else(|e| panic!("{}", e))
94    }
95
96    /// Set the task as active or paused.
97    #[track_caller]
98    pub fn set_active(&self, active: bool) {
99        Runtime::with(|rt| {
100            if let Some(task) = rt.tasks.borrow().get(self.id) {
101                let was_active = task.active.replace(active);
102                if !was_active && active {
103                    _ = rt
104                        .sender
105                        .unbounded_send(SchedulerMsg::TaskNotified(self.id));
106                }
107            }
108        })
109        .unwrap_or_else(|e| panic!("{}", e))
110    }
111}
112
113impl Runtime {
114    /// Start a new future on the same thread as the rest of the VirtualDom.
115    ///
116    /// **You should generally use `spawn` instead of this method unless you specifically need to need to run a task during suspense**
117    ///
118    /// This future will not contribute to suspense resolving but it will run during suspense.
119    ///
120    /// Because this future runs during suspense, you need to be careful to work with hydration. It is not recommended to do any async IO work in this future, as it can easily cause hydration issues. However, you can use isomorphic tasks to do work that can be consistently replicated on the server and client like logging or responding to state changes.
121    ///
122    /// ```rust, no_run
123    /// # use dioxus::prelude::*;
124    /// // ❌ Do not do requests in isomorphic tasks. It may resolve at a different time on the server and client, causing hydration issues.
125    /// let mut state = use_signal(|| None);
126    /// spawn_isomorphic(async move {
127    ///     state.set(Some(reqwest::get("https://api.example.com").await));
128    /// });
129    ///
130    /// // ✅ You may wait for a signal to change and then log it
131    /// let mut state = use_signal(|| 0);
132    /// spawn_isomorphic(async move {
133    ///     loop {
134    ///         tokio::time::sleep(std::time::Duration::from_secs(1)).await;
135    ///         println!("State is {state}");
136    ///     }
137    /// });
138    /// ```
139    pub fn spawn_isomorphic(
140        &self,
141        scope: ScopeId,
142        task: impl Future<Output = ()> + 'static,
143    ) -> Task {
144        self.spawn_task_of_type(scope, task, TaskType::Isomorphic)
145    }
146
147    /// Start a new future on the same thread as the rest of the VirtualDom.
148    ///
149    /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
150    /// and long running tasks.
151    ///
152    /// Whenever the component that owns this future is dropped, the future will be dropped as well.
153    ///
154    /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
155    /// will only occur when the VirtualDom itself has been dropped.
156    pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> Task {
157        self.spawn_task_of_type(scope, task, TaskType::ClientOnly)
158    }
159
160    fn spawn_task_of_type(
161        &self,
162        scope: ScopeId,
163        task: impl Future<Output = ()> + 'static,
164        ty: TaskType,
165    ) -> Task {
166        // Insert the task, temporarily holding a borrow on the tasks map
167        let (task, task_id) = {
168            let mut tasks = self.tasks.borrow_mut();
169
170            let mut task_id = Task::from_id(DefaultKey::default());
171            let mut local_task = None;
172            tasks.insert_with_key(|key| {
173                task_id = Task::from_id(key);
174
175                let new_task = Rc::new(LocalTask {
176                    scope,
177                    active: Cell::new(true),
178                    parent: self.current_task(),
179                    task: RefCell::new(Box::pin(task)),
180                    waker: futures_util::task::waker(Arc::new(LocalTaskHandle {
181                        id: task_id.id,
182                        tx: self.sender.clone(),
183                    })),
184                    ty: RefCell::new(ty),
185                });
186
187                local_task = Some(new_task.clone());
188
189                new_task
190            });
191
192            (local_task.unwrap(), task_id)
193        };
194
195        // Get a borrow on the task, holding no borrows on the tasks map
196        debug_assert!(self.tasks.try_borrow_mut().is_ok());
197        debug_assert!(task.task.try_borrow_mut().is_ok());
198
199        self.sender
200            .unbounded_send(SchedulerMsg::TaskNotified(task_id.id))
201            .expect("Scheduler should exist");
202
203        task_id
204    }
205
206    /// Queue an effect to run after the next render
207    pub(crate) fn queue_effect(&self, id: ScopeId, f: impl FnOnce() + 'static) {
208        let effect = Box::new(f) as Box<dyn FnOnce() + 'static>;
209        let Some(scope) = self.get_state(id) else {
210            return;
211        };
212        let mut status = scope.status.borrow_mut();
213        match &mut *status {
214            ScopeStatus::Mounted => {
215                self.queue_effect_on_mounted_scope(id, effect);
216            }
217            ScopeStatus::Unmounted { effects_queued, .. } => {
218                effects_queued.push(effect);
219            }
220        }
221    }
222
223    /// Queue an effect to run after the next render without checking if the scope is mounted
224    pub(crate) fn queue_effect_on_mounted_scope(
225        &self,
226        id: ScopeId,
227        f: Box<dyn FnOnce() + 'static>,
228    ) {
229        // Add the effect to the queue of effects to run after the next render for the given scope
230        let mut effects = self.pending_effects.borrow_mut();
231        let scope_order = ScopeOrder::new(id.height(), id);
232        match effects.get(&scope_order) {
233            Some(effects) => effects.push_back(f),
234            None => {
235                effects.insert(Effect::new(scope_order, f));
236            }
237        }
238    }
239
240    /// Get the currently running task
241    pub fn current_task(&self) -> Option<Task> {
242        self.current_task.get()
243    }
244
245    /// Get the parent task of the given task, if it exists
246    pub fn parent_task(&self, task: Task) -> Option<Task> {
247        self.tasks.borrow().get(task.id)?.parent
248    }
249
250    pub(crate) fn task_scope(&self, task: Task) -> Option<ScopeId> {
251        self.tasks.borrow().get(task.id).map(|t| t.scope)
252    }
253
254    #[track_caller]
255    pub(crate) fn handle_task_wakeup(&self, id: Task) -> Poll<()> {
256        #[cfg(debug_assertions)]
257        {
258            // Ensure we are currently inside a `Runtime`.
259            Runtime::current().unwrap_or_else(|e| panic!("{}", e));
260        }
261
262        let task = self.tasks.borrow().get(id.id).cloned();
263
264        // The task was removed from the scheduler, so we can just ignore it
265        let Some(task) = task else {
266            return Poll::Ready(());
267        };
268
269        // If a task woke up but is paused, we can just ignore it
270        if !task.active.get() {
271            return Poll::Pending;
272        }
273
274        let mut cx = std::task::Context::from_waker(&task.waker);
275
276        // poll the future with the scope on the stack
277        let poll_result = self.with_scope_on_stack(task.scope, || {
278            self.current_task.set(Some(id));
279
280            let poll_result = task.task.borrow_mut().as_mut().poll(&mut cx);
281
282            if poll_result.is_ready() {
283                // Remove it from the scope so we dont try to double drop it when the scope dropes
284                self.get_state(task.scope)
285                    .unwrap()
286                    .spawned_tasks
287                    .borrow_mut()
288                    .remove(&id);
289
290                self.remove_task(id);
291            }
292
293            poll_result
294        });
295        self.current_task.set(None);
296
297        poll_result
298    }
299
300    /// Drop the future with the given Task
301    ///
302    /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
303    pub(crate) fn remove_task(&self, id: Task) -> Option<Rc<LocalTask>> {
304        // Remove the task from the task list
305        let task = self.tasks.borrow_mut().remove(id.id);
306
307        if let Some(task) = &task {
308            // Remove the task from suspense
309            if let TaskType::Suspended { boundary } = &*task.ty.borrow() {
310                self.suspended_tasks.set(self.suspended_tasks.get() - 1);
311                if let SuspenseLocation::UnderSuspense(boundary) = boundary {
312                    boundary.remove_suspended_task(id);
313                }
314            }
315
316            // Remove the task from pending work. We could reuse the slot before the task is polled and discarded so we need to remove it from pending work instead of filtering out dead tasks when we try to poll them
317            if let Some(scope) = self.get_state(task.scope) {
318                let order = ScopeOrder::new(scope.height(), scope.id);
319                if let Some(dirty_tasks) = self.dirty_tasks.borrow_mut().get(&order) {
320                    dirty_tasks.remove(id);
321                }
322            }
323        }
324
325        task
326    }
327
328    /// Check if a task should be run during suspense
329    pub(crate) fn task_runs_during_suspense(&self, task: Task) -> bool {
330        let borrow = self.tasks.borrow();
331        let task: Option<&LocalTask> = borrow.get(task.id).map(|t| &**t);
332        matches!(task, Some(LocalTask { ty, .. }) if ty.borrow().runs_during_suspense())
333    }
334}
335
336/// the task itself is the waker
337pub(crate) struct LocalTask {
338    scope: ScopeId,
339    parent: Option<Task>,
340    task: RefCell<Pin<Box<dyn Future<Output = ()> + 'static>>>,
341    waker: Waker,
342    ty: RefCell<TaskType>,
343    active: Cell<bool>,
344}
345
346impl LocalTask {
347    /// Suspend the task, returns true if the task was already suspended
348    pub(crate) fn suspend(&self, boundary: SuspenseLocation) -> bool {
349        // Make this a suspended task so it runs during suspense
350        let old_type = self.ty.replace(TaskType::Suspended { boundary });
351        matches!(old_type, TaskType::Suspended { .. })
352    }
353}
354
355#[derive(Clone)]
356enum TaskType {
357    ClientOnly,
358    Suspended { boundary: SuspenseLocation },
359    Isomorphic,
360}
361
362impl TaskType {
363    fn runs_during_suspense(&self) -> bool {
364        matches!(self, TaskType::Isomorphic | TaskType::Suspended { .. })
365    }
366}
367
368/// The type of message that can be sent to the scheduler.
369///
370/// These messages control how the scheduler will process updates to the UI.
371#[derive(Debug)]
372pub(crate) enum SchedulerMsg {
373    /// Immediate updates from Components that mark them as dirty
374    Immediate(ScopeId),
375
376    /// A task has woken and needs to be progressed
377    TaskNotified(slotmap::DefaultKey),
378
379    /// An effect has been queued to run after the next render
380    EffectQueued,
381}
382
383struct LocalTaskHandle {
384    id: slotmap::DefaultKey,
385    tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
386}
387
388impl ArcWake for LocalTaskHandle {
389    fn wake_by_ref(arc_self: &Arc<Self>) {
390        _ = arc_self
391            .tx
392            .unbounded_send(SchedulerMsg::TaskNotified(arc_self.id));
393    }
394}