1use crate::innerlude::Effect;
2use crate::innerlude::ScopeOrder;
3use crate::innerlude::{remove_future, spawn, Runtime};
4use crate::scope_context::ScopeStatus;
5use crate::scope_context::SuspenseLocation;
6use crate::ScopeId;
7use futures_util::task::ArcWake;
8use slotmap::DefaultKey;
9use std::marker::PhantomData;
10use std::panic;
11use std::sync::Arc;
12use std::task::Waker;
13use std::{cell::Cell, future::Future};
14use std::{cell::RefCell, rc::Rc};
15use std::{pin::Pin, task::Poll};
16
17#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
21#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
22pub struct Task {
23 pub(crate) id: slotmap::DefaultKey,
24 unsend: PhantomData<*const ()>,
26}
27
28impl Task {
29 pub(crate) const fn from_id(id: slotmap::DefaultKey) -> Self {
31 Self {
32 id,
33 unsend: PhantomData,
34 }
35 }
36
37 pub fn new(task: impl Future<Output = ()> + 'static) -> Self {
47 spawn(task)
48 }
49
50 pub fn cancel(self) {
54 remove_future(self);
55 }
56
57 pub fn pause(&self) {
59 self.set_active(false);
60 }
61
62 pub fn resume(&self) {
64 self.set_active(true);
65 }
66
67 pub fn paused(&self) -> bool {
69 Runtime::with(|rt| {
70 if let Some(task) = rt.tasks.borrow().get(self.id) {
71 !task.active.get()
72 } else {
73 false
74 }
75 })
76 .unwrap_or_default()
77 }
78
79 #[track_caller]
81 pub fn wake(&self) {
82 Runtime::with(|rt| {
83 _ = rt
84 .sender
85 .unbounded_send(SchedulerMsg::TaskNotified(self.id))
86 })
87 .unwrap_or_else(|e| panic!("{}", e))
88 }
89
90 #[track_caller]
92 pub fn poll_now(&self) -> Poll<()> {
93 Runtime::with(|rt| rt.handle_task_wakeup(*self)).unwrap_or_else(|e| panic!("{}", e))
94 }
95
96 #[track_caller]
98 pub fn set_active(&self, active: bool) {
99 Runtime::with(|rt| {
100 if let Some(task) = rt.tasks.borrow().get(self.id) {
101 let was_active = task.active.replace(active);
102 if !was_active && active {
103 _ = rt
104 .sender
105 .unbounded_send(SchedulerMsg::TaskNotified(self.id));
106 }
107 }
108 })
109 .unwrap_or_else(|e| panic!("{}", e))
110 }
111}
112
113impl Runtime {
114 pub fn spawn_isomorphic(
140 &self,
141 scope: ScopeId,
142 task: impl Future<Output = ()> + 'static,
143 ) -> Task {
144 self.spawn_task_of_type(scope, task, TaskType::Isomorphic)
145 }
146
147 pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> Task {
157 self.spawn_task_of_type(scope, task, TaskType::ClientOnly)
158 }
159
160 fn spawn_task_of_type(
161 &self,
162 scope: ScopeId,
163 task: impl Future<Output = ()> + 'static,
164 ty: TaskType,
165 ) -> Task {
166 let (task, task_id) = {
168 let mut tasks = self.tasks.borrow_mut();
169
170 let mut task_id = Task::from_id(DefaultKey::default());
171 let mut local_task = None;
172 tasks.insert_with_key(|key| {
173 task_id = Task::from_id(key);
174
175 let new_task = Rc::new(LocalTask {
176 scope,
177 active: Cell::new(true),
178 parent: self.current_task(),
179 task: RefCell::new(Box::pin(task)),
180 waker: futures_util::task::waker(Arc::new(LocalTaskHandle {
181 id: task_id.id,
182 tx: self.sender.clone(),
183 })),
184 ty: RefCell::new(ty),
185 });
186
187 local_task = Some(new_task.clone());
188
189 new_task
190 });
191
192 (local_task.unwrap(), task_id)
193 };
194
195 debug_assert!(self.tasks.try_borrow_mut().is_ok());
197 debug_assert!(task.task.try_borrow_mut().is_ok());
198
199 self.sender
200 .unbounded_send(SchedulerMsg::TaskNotified(task_id.id))
201 .expect("Scheduler should exist");
202
203 task_id
204 }
205
206 pub(crate) fn queue_effect(&self, id: ScopeId, f: impl FnOnce() + 'static) {
208 let effect = Box::new(f) as Box<dyn FnOnce() + 'static>;
209 let Some(scope) = self.get_state(id) else {
210 return;
211 };
212 let mut status = scope.status.borrow_mut();
213 match &mut *status {
214 ScopeStatus::Mounted => {
215 self.queue_effect_on_mounted_scope(id, effect);
216 }
217 ScopeStatus::Unmounted { effects_queued, .. } => {
218 effects_queued.push(effect);
219 }
220 }
221 }
222
223 pub(crate) fn queue_effect_on_mounted_scope(
225 &self,
226 id: ScopeId,
227 f: Box<dyn FnOnce() + 'static>,
228 ) {
229 let mut effects = self.pending_effects.borrow_mut();
231 let scope_order = ScopeOrder::new(id.height(), id);
232 match effects.get(&scope_order) {
233 Some(effects) => effects.push_back(f),
234 None => {
235 effects.insert(Effect::new(scope_order, f));
236 }
237 }
238 }
239
240 pub fn current_task(&self) -> Option<Task> {
242 self.current_task.get()
243 }
244
245 pub fn parent_task(&self, task: Task) -> Option<Task> {
247 self.tasks.borrow().get(task.id)?.parent
248 }
249
250 pub(crate) fn task_scope(&self, task: Task) -> Option<ScopeId> {
251 self.tasks.borrow().get(task.id).map(|t| t.scope)
252 }
253
254 #[track_caller]
255 pub(crate) fn handle_task_wakeup(&self, id: Task) -> Poll<()> {
256 #[cfg(debug_assertions)]
257 {
258 Runtime::current().unwrap_or_else(|e| panic!("{}", e));
260 }
261
262 let task = self.tasks.borrow().get(id.id).cloned();
263
264 let Some(task) = task else {
266 return Poll::Ready(());
267 };
268
269 if !task.active.get() {
271 return Poll::Pending;
272 }
273
274 let mut cx = std::task::Context::from_waker(&task.waker);
275
276 let poll_result = self.with_scope_on_stack(task.scope, || {
278 self.current_task.set(Some(id));
279
280 let poll_result = task.task.borrow_mut().as_mut().poll(&mut cx);
281
282 if poll_result.is_ready() {
283 self.get_state(task.scope)
285 .unwrap()
286 .spawned_tasks
287 .borrow_mut()
288 .remove(&id);
289
290 self.remove_task(id);
291 }
292
293 poll_result
294 });
295 self.current_task.set(None);
296
297 poll_result
298 }
299
300 pub(crate) fn remove_task(&self, id: Task) -> Option<Rc<LocalTask>> {
304 let task = self.tasks.borrow_mut().remove(id.id);
306
307 if let Some(task) = &task {
308 if let TaskType::Suspended { boundary } = &*task.ty.borrow() {
310 self.suspended_tasks.set(self.suspended_tasks.get() - 1);
311 if let SuspenseLocation::UnderSuspense(boundary) = boundary {
312 boundary.remove_suspended_task(id);
313 }
314 }
315
316 if let Some(scope) = self.get_state(task.scope) {
318 let order = ScopeOrder::new(scope.height(), scope.id);
319 if let Some(dirty_tasks) = self.dirty_tasks.borrow_mut().get(&order) {
320 dirty_tasks.remove(id);
321 }
322 }
323 }
324
325 task
326 }
327
328 pub(crate) fn task_runs_during_suspense(&self, task: Task) -> bool {
330 let borrow = self.tasks.borrow();
331 let task: Option<&LocalTask> = borrow.get(task.id).map(|t| &**t);
332 matches!(task, Some(LocalTask { ty, .. }) if ty.borrow().runs_during_suspense())
333 }
334}
335
336pub(crate) struct LocalTask {
338 scope: ScopeId,
339 parent: Option<Task>,
340 task: RefCell<Pin<Box<dyn Future<Output = ()> + 'static>>>,
341 waker: Waker,
342 ty: RefCell<TaskType>,
343 active: Cell<bool>,
344}
345
346impl LocalTask {
347 pub(crate) fn suspend(&self, boundary: SuspenseLocation) -> bool {
349 let old_type = self.ty.replace(TaskType::Suspended { boundary });
351 matches!(old_type, TaskType::Suspended { .. })
352 }
353}
354
355#[derive(Clone)]
356enum TaskType {
357 ClientOnly,
358 Suspended { boundary: SuspenseLocation },
359 Isomorphic,
360}
361
362impl TaskType {
363 fn runs_during_suspense(&self) -> bool {
364 matches!(self, TaskType::Isomorphic | TaskType::Suspended { .. })
365 }
366}
367
368#[derive(Debug)]
372pub(crate) enum SchedulerMsg {
373 Immediate(ScopeId),
375
376 TaskNotified(slotmap::DefaultKey),
378
379 EffectQueued,
381}
382
383struct LocalTaskHandle {
384 id: slotmap::DefaultKey,
385 tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
386}
387
388impl ArcWake for LocalTaskHandle {
389 fn wake_by_ref(arc_self: &Arc<Self>) {
390 _ = arc_self
391 .tx
392 .unbounded_send(SchedulerMsg::TaskNotified(arc_self.id));
393 }
394}