dioxus_core/
tasks.rs

1use crate::innerlude::Effect;
2use crate::innerlude::ScopeOrder;
3use crate::innerlude::{remove_future, spawn, Runtime};
4use crate::scope_context::ScopeStatus;
5use crate::scope_context::SuspenseLocation;
6use crate::ScopeId;
7use futures_util::task::ArcWake;
8use slotmap::DefaultKey;
9use std::marker::PhantomData;
10use std::panic;
11use std::sync::Arc;
12use std::task::Waker;
13use std::{cell::Cell, future::Future};
14use std::{cell::RefCell, rc::Rc};
15use std::{pin::Pin, task::Poll};
16
17/// A task's unique identifier.
18///
19/// `Task` is a unique identifier for a task that has been spawned onto the runtime. It can be used to cancel the task
20#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
21#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
22pub struct Task {
23    pub(crate) id: slotmap::DefaultKey,
24    // We add a raw pointer to make this !Send + !Sync
25    unsend: PhantomData<*const ()>,
26}
27
28impl Task {
29    /// Create a task from a raw id
30    pub(crate) const fn from_id(id: slotmap::DefaultKey) -> Self {
31        Self {
32            id,
33            unsend: PhantomData,
34        }
35    }
36
37    /// Start a new future on the same thread as the rest of the VirtualDom.
38    ///
39    /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
40    /// and long running tasks.
41    ///
42    /// Whenever the component that owns this future is dropped, the future will be dropped as well.
43    ///
44    /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
45    /// will only occur when the VirtualDom itself has been dropped.
46    pub fn new(task: impl Future<Output = ()> + 'static) -> Self {
47        spawn(task)
48    }
49
50    /// Drop the task immediately.
51    ///
52    /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
53    pub fn cancel(self) {
54        remove_future(self);
55    }
56
57    /// Pause the task.
58    pub fn pause(&self) {
59        self.set_active(false);
60    }
61
62    /// Resume the task.
63    pub fn resume(&self) {
64        self.set_active(true);
65    }
66
67    /// Check if the task is paused.
68    pub fn paused(&self) -> bool {
69        Runtime::with(|rt| {
70            if let Some(task) = rt.tasks.borrow().get(self.id) {
71                !task.active.get()
72            } else {
73                false
74            }
75        })
76        .unwrap_or_default()
77    }
78
79    /// Wake the task.
80    #[track_caller]
81    pub fn wake(&self) {
82        Runtime::with(|rt| {
83            _ = rt
84                .sender
85                .unbounded_send(SchedulerMsg::TaskNotified(self.id))
86        })
87        .unwrap_or_else(|e| panic!("{}", e))
88    }
89
90    /// Poll the task immediately.
91    #[track_caller]
92    pub fn poll_now(&self) -> Poll<()> {
93        Runtime::with(|rt| rt.handle_task_wakeup(*self)).unwrap_or_else(|e| panic!("{}", e))
94    }
95
96    /// Set the task as active or paused.
97    #[track_caller]
98    pub fn set_active(&self, active: bool) {
99        Runtime::with(|rt| {
100            if let Some(task) = rt.tasks.borrow().get(self.id) {
101                let was_active = task.active.replace(active);
102                if !was_active && active {
103                    _ = rt
104                        .sender
105                        .unbounded_send(SchedulerMsg::TaskNotified(self.id));
106                }
107            }
108        })
109        .unwrap_or_else(|e| panic!("{}", e))
110    }
111}
112
113impl Runtime {
114    /// Start a new future on the same thread as the rest of the VirtualDom.
115    ///
116    /// **You should generally use `spawn` instead of this method unless you specifically need to need to run a task during suspense**
117    ///
118    /// This future will not contribute to suspense resolving but it will run during suspense.
119    ///
120    /// Because this future runs during suspense, you need to be careful to work with hydration. It is not recommended to do any async IO work in this future, as it can easily cause hydration issues. However, you can use isomorphic tasks to do work that can be consistently replicated on the server and client like logging or responding to state changes.
121    ///
122    /// ```rust, no_run
123    /// # use dioxus::prelude::*;
124    /// # use dioxus_core::spawn_isomorphic;
125    /// // ❌ Do not do requests in isomorphic tasks. It may resolve at a different time on the server and client, causing hydration issues.
126    /// let mut state = use_signal(|| None);
127    /// spawn_isomorphic(async move {
128    ///     state.set(Some(reqwest::get("https://api.example.com").await));
129    /// });
130    ///
131    /// // ✅ You may wait for a signal to change and then log it
132    /// let mut state = use_signal(|| 0);
133    /// spawn_isomorphic(async move {
134    ///     loop {
135    ///         tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136    ///         println!("State is {state}");
137    ///     }
138    /// });
139    /// ```
140    pub fn spawn_isomorphic(
141        &self,
142        scope: ScopeId,
143        task: impl Future<Output = ()> + 'static,
144    ) -> Task {
145        self.spawn_task_of_type(scope, task, TaskType::Isomorphic)
146    }
147
148    /// Start a new future on the same thread as the rest of the VirtualDom.
149    ///
150    /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
151    /// and long running tasks.
152    ///
153    /// Whenever the component that owns this future is dropped, the future will be dropped as well.
154    ///
155    /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
156    /// will only occur when the VirtualDom itself has been dropped.
157    pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> Task {
158        self.spawn_task_of_type(scope, task, TaskType::ClientOnly)
159    }
160
161    fn spawn_task_of_type(
162        &self,
163        scope: ScopeId,
164        task: impl Future<Output = ()> + 'static,
165        ty: TaskType,
166    ) -> Task {
167        self.spawn_task_of_type_inner(scope, Box::pin(task), ty)
168    }
169
170    // a non-momorphic version of spawn_task_of_type, helps with binari sizes
171    fn spawn_task_of_type_inner(
172        &self,
173        scope: ScopeId,
174        pinned_task: Pin<Box<dyn Future<Output = ()>>>,
175        ty: TaskType,
176    ) -> Task {
177        // Insert the task, temporarily holding a borrow on the tasks map
178        let (task, task_id) = {
179            let mut tasks = self.tasks.borrow_mut();
180
181            let mut task_id = Task::from_id(DefaultKey::default());
182            let mut local_task = None;
183            tasks.insert_with_key(|key| {
184                task_id = Task::from_id(key);
185
186                let new_task = Rc::new(LocalTask {
187                    scope,
188                    active: Cell::new(true),
189                    parent: self.current_task(),
190                    task: RefCell::new(pinned_task),
191                    waker: futures_util::task::waker(Arc::new(LocalTaskHandle {
192                        id: task_id.id,
193                        tx: self.sender.clone(),
194                    })),
195                    ty: RefCell::new(ty),
196                });
197
198                local_task = Some(new_task.clone());
199
200                new_task
201            });
202
203            (local_task.unwrap(), task_id)
204        };
205
206        // Get a borrow on the task, holding no borrows on the tasks map
207        debug_assert!(self.tasks.try_borrow_mut().is_ok());
208        debug_assert!(task.task.try_borrow_mut().is_ok());
209
210        self.sender
211            .unbounded_send(SchedulerMsg::TaskNotified(task_id.id))
212            .expect("Scheduler should exist");
213
214        task_id
215    }
216
217    /// Queue an effect to run after the next render
218    pub(crate) fn queue_effect(&self, id: ScopeId, f: impl FnOnce() + 'static) {
219        let effect = Box::new(f) as Box<dyn FnOnce() + 'static>;
220        let Some(scope) = self.get_state(id) else {
221            return;
222        };
223        let mut status = scope.status.borrow_mut();
224        match &mut *status {
225            ScopeStatus::Mounted => {
226                self.queue_effect_on_mounted_scope(id, effect);
227            }
228            ScopeStatus::Unmounted { effects_queued, .. } => {
229                effects_queued.push(effect);
230            }
231        }
232    }
233
234    /// Queue an effect to run after the next render without checking if the scope is mounted
235    pub(crate) fn queue_effect_on_mounted_scope(
236        &self,
237        id: ScopeId,
238        f: Box<dyn FnOnce() + 'static>,
239    ) {
240        // Add the effect to the queue of effects to run after the next render for the given scope
241        let mut effects = self.pending_effects.borrow_mut();
242        let scope_order = ScopeOrder::new(id.height(), id);
243        match effects.get(&scope_order) {
244            Some(effects) => effects.push_back(f),
245            None => {
246                effects.insert(Effect::new(scope_order, f));
247            }
248        }
249    }
250
251    /// Get the currently running task
252    pub fn current_task(&self) -> Option<Task> {
253        self.current_task.get()
254    }
255
256    /// Get the parent task of the given task, if it exists
257    pub fn parent_task(&self, task: Task) -> Option<Task> {
258        self.tasks.borrow().get(task.id)?.parent
259    }
260
261    pub(crate) fn task_scope(&self, task: Task) -> Option<ScopeId> {
262        self.tasks.borrow().get(task.id).map(|t| t.scope)
263    }
264
265    #[track_caller]
266    pub(crate) fn handle_task_wakeup(&self, id: Task) -> Poll<()> {
267        #[cfg(debug_assertions)]
268        {
269            // Ensure we are currently inside a `Runtime`.
270            Runtime::current().unwrap_or_else(|e| panic!("{}", e));
271        }
272
273        let task = self.tasks.borrow().get(id.id).cloned();
274
275        // The task was removed from the scheduler, so we can just ignore it
276        let Some(task) = task else {
277            return Poll::Ready(());
278        };
279
280        // If a task woke up but is paused, we can just ignore it
281        if !task.active.get() {
282            return Poll::Pending;
283        }
284
285        let mut cx = std::task::Context::from_waker(&task.waker);
286
287        // poll the future with the scope on the stack
288        let poll_result = self.with_scope_on_stack(task.scope, || {
289            self.current_task.set(Some(id));
290
291            let poll_result = task.task.borrow_mut().as_mut().poll(&mut cx);
292
293            if poll_result.is_ready() {
294                // Remove it from the scope so we dont try to double drop it when the scope dropes
295                self.get_state(task.scope)
296                    .unwrap()
297                    .spawned_tasks
298                    .borrow_mut()
299                    .remove(&id);
300
301                self.remove_task(id);
302            }
303
304            poll_result
305        });
306        self.current_task.set(None);
307
308        poll_result
309    }
310
311    /// Drop the future with the given Task
312    ///
313    /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
314    pub(crate) fn remove_task(&self, id: Task) -> Option<Rc<LocalTask>> {
315        // Remove the task from the task list
316        let task = self.tasks.borrow_mut().remove(id.id);
317
318        if let Some(task) = &task {
319            // Remove the task from suspense
320            if let TaskType::Suspended { boundary } = &*task.ty.borrow() {
321                self.suspended_tasks.set(self.suspended_tasks.get() - 1);
322                if let SuspenseLocation::UnderSuspense(boundary) = boundary {
323                    boundary.remove_suspended_task(id);
324                }
325            }
326
327            // Remove the task from pending work. We could reuse the slot before the task is polled and discarded so we need to remove it from pending work instead of filtering out dead tasks when we try to poll them
328            if let Some(scope) = self.get_state(task.scope) {
329                let order = ScopeOrder::new(scope.height(), scope.id);
330                if let Some(dirty_tasks) = self.dirty_tasks.borrow_mut().get(&order) {
331                    dirty_tasks.remove(id);
332                }
333            }
334        }
335
336        task
337    }
338
339    /// Check if a task should be run during suspense
340    pub(crate) fn task_runs_during_suspense(&self, task: Task) -> bool {
341        let borrow = self.tasks.borrow();
342        let task: Option<&LocalTask> = borrow.get(task.id).map(|t| &**t);
343        matches!(task, Some(LocalTask { ty, .. }) if ty.borrow().runs_during_suspense())
344    }
345}
346
347/// the task itself is the waker
348pub(crate) struct LocalTask {
349    scope: ScopeId,
350    parent: Option<Task>,
351    task: RefCell<Pin<Box<dyn Future<Output = ()> + 'static>>>,
352    waker: Waker,
353    ty: RefCell<TaskType>,
354    active: Cell<bool>,
355}
356
357impl LocalTask {
358    /// Suspend the task, returns true if the task was already suspended
359    pub(crate) fn suspend(&self, boundary: SuspenseLocation) -> bool {
360        // Make this a suspended task so it runs during suspense
361        let old_type = self.ty.replace(TaskType::Suspended { boundary });
362        matches!(old_type, TaskType::Suspended { .. })
363    }
364}
365
366#[derive(Clone)]
367enum TaskType {
368    ClientOnly,
369    Suspended { boundary: SuspenseLocation },
370    Isomorphic,
371}
372
373impl TaskType {
374    fn runs_during_suspense(&self) -> bool {
375        matches!(self, TaskType::Isomorphic | TaskType::Suspended { .. })
376    }
377}
378
379/// The type of message that can be sent to the scheduler.
380///
381/// These messages control how the scheduler will process updates to the UI.
382#[derive(Debug)]
383pub(crate) enum SchedulerMsg {
384    /// All components have been marked as dirty, requiring a full render
385    AllDirty,
386
387    /// Immediate updates from Components that mark them as dirty
388    Immediate(ScopeId),
389
390    /// A task has woken and needs to be progressed
391    TaskNotified(slotmap::DefaultKey),
392
393    /// An effect has been queued to run after the next render
394    EffectQueued,
395}
396
397struct LocalTaskHandle {
398    id: slotmap::DefaultKey,
399    tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
400}
401
402impl ArcWake for LocalTaskHandle {
403    fn wake_by_ref(arc_self: &Arc<Self>) {
404        _ = arc_self
405            .tx
406            .unbounded_send(SchedulerMsg::TaskNotified(arc_self.id));
407    }
408}