Skip to main content

vibe_ready/api/
scheduler.rs

1//! Task scheduler primitives (B9 capability).
2//!
3//! Provides delayed/periodic execution, cancellation tokens, priority lanes,
4//! and an inspection panel returning live snapshots of in-flight tasks.
5
6use crate::api::engine_error::{VibeEngineError, VibeEngineErrorCode};
7use crate::api::engine_executor::VibeEngineTask;
8use crate::log::log_def::DESC;
9use crate::{log_e, platform};
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::runtime::Handle;
17use tokio::sync::mpsc::{channel, Sender};
18use tokio::sync::Notify;
19
20fn scheduler_lock_error(context: impl Into<String>) -> VibeEngineError {
21    VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError).with_context(context)
22}
23
24/// Priority lane used by [`crate::VibeEngine::post_with_priority`] and friends.
25#[repr(u8)]
26#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
27pub enum VibeTaskPriority {
28    /// Run before normal and low priority tasks queued at the same time.
29    High = 0,
30    /// Default priority for scheduled work.
31    Normal = 1,
32    /// Run after high and normal priority tasks.
33    Low = 2,
34}
35
36impl VibeTaskPriority {
37    fn as_str(self) -> &'static str {
38        match self {
39            VibeTaskPriority::High => "high",
40            VibeTaskPriority::Normal => "normal",
41            VibeTaskPriority::Low => "low",
42        }
43    }
44}
45
46/// Static category describing how a task was scheduled.
47#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48pub enum VibeTaskKind {
49    /// One-shot task launched on a priority lane via `post_with_priority`.
50    Once,
51    /// One-shot task scheduled via `schedule_after`.
52    Delayed,
53    /// Repeating task scheduled via `schedule_every`.
54    Periodic,
55}
56
57/// Lifecycle state of a scheduler-tracked task.
58#[derive(Clone, Copy, Debug, Eq, PartialEq)]
59pub enum VibeTaskState {
60    /// The task is queued but has not started yet.
61    Pending,
62    /// The task is currently running.
63    Running,
64    /// The task finished successfully.
65    Completed,
66    /// Cancellation was requested and the task stopped.
67    Cancelled,
68    /// The task failed or panicked.
69    Failed,
70}
71
72/// Cooperative cancellation token passed to scheduled tasks.
73///
74/// Tasks may poll [`VibeCancellationToken::is_cancelled`] before/after async
75/// points or `await` [`VibeCancellationToken::cancelled`] to be notified the
76/// moment cancellation is requested.
77#[derive(Clone)]
78pub struct VibeCancellationToken {
79    flag: Arc<AtomicBool>,
80    notify: Arc<Notify>,
81}
82
83impl VibeCancellationToken {
84    /// Creates a cancellation token in the non-cancelled state.
85    ///
86    /// # Returns
87    ///
88    /// A new [`VibeCancellationToken`] that can be cloned and shared with tasks.
89    ///
90    /// # Examples
91    ///
92    /// ```
93    /// use vibe_ready::VibeCancellationToken;
94    ///
95    /// let token = VibeCancellationToken::new();
96    /// assert!(!token.is_cancelled());
97    /// token.cancel();
98    /// assert!(token.is_cancelled());
99    /// ```
100    pub fn new() -> Self {
101        Self {
102            flag: Arc::new(AtomicBool::new(false)),
103            notify: Arc::new(Notify::new()),
104        }
105    }
106
107    /// Trip the token; idempotent.
108    ///
109    /// # Returns
110    ///
111    /// This method returns `()` and wakes tasks waiting in
112    /// [`VibeCancellationToken::cancelled`].
113    pub fn cancel(&self) {
114        if !self.flag.swap(true, Ordering::AcqRel) {
115            self.notify.notify_waiters();
116        }
117    }
118
119    /// Checks whether cancellation has been requested.
120    ///
121    /// # Returns
122    ///
123    /// `true` after [`VibeCancellationToken::cancel`] has been called.
124    pub fn is_cancelled(&self) -> bool {
125        self.flag.load(Ordering::Acquire)
126    }
127
128    /// Resolves once the token has been tripped.
129    ///
130    /// # Returns
131    ///
132    /// This async method returns `()` after cancellation is observed.
133    pub async fn cancelled(&self) {
134        loop {
135            if self.is_cancelled() {
136                return;
137            }
138            let waiter = self.notify.notified();
139            if self.is_cancelled() {
140                return;
141            }
142            waiter.await;
143            if self.is_cancelled() {
144                return;
145            }
146        }
147    }
148}
149
150impl Default for VibeCancellationToken {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156#[derive(Clone, Debug)]
157struct TaskTimestamps {
158    created_at_ms: i64,
159    started_at_ms: Option<i64>,
160    finished_at_ms: Option<i64>,
161}
162
163struct TaskInner {
164    id: u64,
165    name: String,
166    kind: VibeTaskKind,
167    priority: VibeTaskPriority,
168    token: VibeCancellationToken,
169    state: Mutex<VibeTaskState>,
170    timestamps: Mutex<TaskTimestamps>,
171    finished: Notify,
172}
173
174impl TaskInner {
175    fn snapshot(&self) -> Result<VibeTaskInfo, VibeEngineError> {
176        let state = *self
177            .state
178            .lock()
179            .map_err(|_| scheduler_lock_error("task state lock poisoned"))?;
180        let ts = self
181            .timestamps
182            .lock()
183            .map_err(|_| scheduler_lock_error("task timestamps lock poisoned"))?
184            .clone();
185        Ok(VibeTaskInfo {
186            id: self.id,
187            name: self.name.clone(),
188            kind: self.kind,
189            priority: self.priority,
190            state,
191            created_at_ms: ts.created_at_ms,
192            started_at_ms: ts.started_at_ms,
193            finished_at_ms: ts.finished_at_ms,
194        })
195    }
196
197    fn set_state(&self, new_state: VibeTaskState) -> Result<(), VibeEngineError> {
198        let mut guard = self
199            .state
200            .lock()
201            .map_err(|_| scheduler_lock_error("task state lock poisoned"))?;
202        if matches!(
203            *guard,
204            VibeTaskState::Completed | VibeTaskState::Cancelled | VibeTaskState::Failed
205        ) {
206            return Ok(());
207        }
208        *guard = new_state;
209        Ok(())
210    }
211
212    fn mark_started(&self) -> Result<(), VibeEngineError> {
213        self.set_state(VibeTaskState::Running)?;
214        let mut ts = self
215            .timestamps
216            .lock()
217            .map_err(|_| scheduler_lock_error("task timestamps lock poisoned"))?;
218        if ts.started_at_ms.is_none() {
219            ts.started_at_ms = Some(platform::now());
220        }
221        Ok(())
222    }
223
224    fn finish(&self, final_state: VibeTaskState) -> Result<(), VibeEngineError> {
225        {
226            let mut guard = self
227                .state
228                .lock()
229                .map_err(|_| scheduler_lock_error("task state lock poisoned"))?;
230            *guard = final_state;
231        }
232        {
233            let mut ts = self
234                .timestamps
235                .lock()
236                .map_err(|_| scheduler_lock_error("task timestamps lock poisoned"))?;
237            if ts.finished_at_ms.is_none() {
238                ts.finished_at_ms = Some(platform::now());
239            }
240        }
241        self.finished.notify_waiters();
242        Ok(())
243    }
244}
245
246/// Snapshot of a scheduler-tracked task, returned by [`VibeTaskPanel::list`].
247#[derive(Clone, Debug)]
248pub struct VibeTaskInfo {
249    /// Unique task id assigned by the scheduler.
250    pub id: u64,
251    /// Human-readable task name supplied by the caller.
252    pub name: String,
253    /// How the task was scheduled.
254    pub kind: VibeTaskKind,
255    /// Priority lane used by the task.
256    pub priority: VibeTaskPriority,
257    /// Current lifecycle state.
258    pub state: VibeTaskState,
259    /// Creation timestamp in Unix milliseconds.
260    pub created_at_ms: i64,
261    /// Start timestamp in Unix milliseconds, if the task has started.
262    pub started_at_ms: Option<i64>,
263    /// Finish timestamp in Unix milliseconds, if the task has finished.
264    pub finished_at_ms: Option<i64>,
265}
266
267/// Handle returned by every scheduling API.
268///
269/// Cloning is cheap: all clones share the same task state.
270#[derive(Clone)]
271pub struct VibeTaskHandle {
272    inner: Arc<TaskInner>,
273}
274
275impl VibeTaskHandle {
276    /// Returns the scheduler-assigned task id.
277    ///
278    /// # Returns
279    ///
280    /// A unique task id for this engine instance.
281    pub fn id(&self) -> u64 {
282        self.inner.id
283    }
284
285    /// Returns the task name.
286    ///
287    /// # Returns
288    ///
289    /// A borrowed task name supplied when the task was scheduled.
290    pub fn name(&self) -> &str {
291        &self.inner.name
292    }
293
294    /// Returns the task kind.
295    ///
296    /// # Returns
297    ///
298    /// A [`VibeTaskKind`] describing whether the task is once, delayed, or periodic.
299    pub fn kind(&self) -> VibeTaskKind {
300        self.inner.kind
301    }
302
303    /// Returns the task priority.
304    ///
305    /// # Returns
306    ///
307    /// The [`VibeTaskPriority`] lane used by this task.
308    pub fn priority(&self) -> VibeTaskPriority {
309        self.inner.priority
310    }
311
312    /// Returns the current task state.
313    ///
314    /// # Returns
315    ///
316    /// `Ok(VibeTaskState)` or [`VibeEngineError`] if scheduler state is poisoned.
317    pub fn state(&self) -> Result<VibeTaskState, VibeEngineError> {
318        self.inner
319            .state
320            .lock()
321            .map(|guard| *guard)
322            .map_err(|_| scheduler_lock_error("task state lock poisoned"))
323    }
324
325    /// Returns the cancellation token shared with the task.
326    ///
327    /// # Returns
328    ///
329    /// A clone of the task's [`VibeCancellationToken`].
330    pub fn token(&self) -> VibeCancellationToken {
331        self.inner.token.clone()
332    }
333
334    /// Returns a point-in-time snapshot for this task.
335    ///
336    /// # Returns
337    ///
338    /// `Ok(VibeTaskInfo)` with identifiers, state, and timestamps.
339    pub fn info(&self) -> Result<VibeTaskInfo, VibeEngineError> {
340        self.inner.snapshot()
341    }
342
343    /// Trips the cancellation token. The task itself decides when to bail.
344    /// Periodic tasks observe the token before scheduling the next iteration
345    /// and `await` callers of [`VibeTaskHandle::join`] eventually return
346    /// `Err(Cancelled)`.
347    ///
348    /// # Returns
349    ///
350    /// This method returns `()` after requesting cancellation.
351    pub fn cancel(&self) {
352        self.inner.token.cancel();
353    }
354
355    /// Checks whether the task is in a terminal state.
356    ///
357    /// # Returns
358    ///
359    /// `Ok(true)` for completed, cancelled, or failed tasks.
360    pub fn is_finished(&self) -> Result<bool, VibeEngineError> {
361        Ok(matches!(
362            self.state()?,
363            VibeTaskState::Completed | VibeTaskState::Cancelled | VibeTaskState::Failed
364        ))
365    }
366
367    /// Async join: resolves to `Ok(())` on completion, `Err(Cancelled)` on cancellation,
368    /// or `Err(InternalError)` on panic.
369    pub async fn join(&self) -> Result<(), VibeEngineError> {
370        loop {
371            match self.state()? {
372                VibeTaskState::Completed => return Ok(()),
373                VibeTaskState::Cancelled => {
374                    return Err(VibeEngineError::from_error_code(
375                        VibeEngineErrorCode::Cancelled,
376                    ));
377                }
378                VibeTaskState::Failed => {
379                    return Err(VibeEngineError::from_error_code(
380                        VibeEngineErrorCode::InternalError,
381                    ));
382                }
383                VibeTaskState::Pending | VibeTaskState::Running => {}
384            }
385            let notified = self.inner.finished.notified();
386            if self.is_finished()? {
387                continue;
388            }
389            notified.await;
390        }
391    }
392}
393
394/// Diagnostic snapshot of all live tasks owned by the engine scheduler.
395#[derive(Clone)]
396pub struct VibeTaskPanel {
397    registry: Arc<TaskRegistry>,
398}
399
400impl VibeTaskPanel {
401    /// Lists live scheduler task snapshots.
402    ///
403    /// # Returns
404    ///
405    /// A vector of [`VibeTaskInfo`] values for tasks still tracked by the scheduler.
406    ///
407    /// # Examples
408    ///
409    /// ```no_run
410    /// # use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
411    /// # fn demo() -> VibeResult<()> {
412    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
413    /// let tasks = engine.tasks().list()?;
414    /// assert!(tasks.is_empty());
415    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
416    /// # Ok(())
417    /// # }
418    /// ```
419    pub fn list(&self) -> Result<Vec<VibeTaskInfo>, VibeEngineError> {
420        self.registry.snapshot()
421    }
422
423    /// Counts live scheduler tasks.
424    ///
425    /// # Returns
426    ///
427    /// The number of tasks currently tracked by the scheduler.
428    pub fn count(&self) -> Result<usize, VibeEngineError> {
429        self.registry.len()
430    }
431}
432
433struct TaskRegistry {
434    tasks: Mutex<HashMap<u64, Arc<TaskInner>>>,
435}
436
437impl TaskRegistry {
438    fn new() -> Self {
439        Self {
440            tasks: Mutex::new(HashMap::new()),
441        }
442    }
443
444    fn insert(&self, task: Arc<TaskInner>) -> Result<(), VibeEngineError> {
445        self.tasks
446            .lock()
447            .map_err(|_| scheduler_lock_error("task registry lock poisoned"))?
448            .insert(task.id, task);
449        Ok(())
450    }
451
452    fn remove(&self, id: u64) -> Result<(), VibeEngineError> {
453        self.tasks
454            .lock()
455            .map_err(|_| scheduler_lock_error("task registry lock poisoned"))?
456            .remove(&id);
457        Ok(())
458    }
459
460    fn snapshot(&self) -> Result<Vec<VibeTaskInfo>, VibeEngineError> {
461        let guard = self
462            .tasks
463            .lock()
464            .map_err(|_| scheduler_lock_error("task registry lock poisoned"))?;
465        guard.values().map(|t| t.snapshot()).collect()
466    }
467
468    fn len(&self) -> Result<usize, VibeEngineError> {
469        Ok(self
470            .tasks
471            .lock()
472            .map_err(|_| scheduler_lock_error("task registry lock poisoned"))?
473            .len())
474    }
475
476    fn cancel_all(&self) -> Result<Vec<Arc<TaskInner>>, VibeEngineError> {
477        let guard = self
478            .tasks
479            .lock()
480            .map_err(|_| scheduler_lock_error("task registry lock poisoned"))?;
481        let snapshot: Vec<Arc<TaskInner>> = guard.values().cloned().collect();
482        for task in &snapshot {
483            task.token.cancel();
484        }
485        Ok(snapshot)
486    }
487}
488
489/// Internal scheduler driving priority lanes and tracking task state.
490pub(crate) struct VibeTaskScheduler {
491    handle: Handle,
492    registry: Arc<TaskRegistry>,
493    next_id: AtomicU64,
494    senders: Mutex<Option<[Sender<VibeEngineTask>; 3]>>,
495}
496
497impl VibeTaskScheduler {
498    pub(crate) fn new(handle: Handle, capacity: usize) -> Arc<Self> {
499        let cap = capacity.max(1);
500        let (high_tx, mut high_rx) = channel::<VibeEngineTask>(cap);
501        let (normal_tx, mut normal_rx) = channel::<VibeEngineTask>(cap);
502        let (low_tx, mut low_rx) = channel::<VibeEngineTask>(cap);
503
504        // Single dispatcher that drains the three priority channels with a
505        // biased select: when multiple lanes have work ready, the high lane
506        // wins, then normal, then low. Each pulled task is awaited inline so
507        // ordering of dispatch is preserved (priority cannot be circumvented
508        // by tokio's scheduler once we've handed work to it).
509        handle.spawn(async move {
510            loop {
511                tokio::select! {
512                    biased;
513                    maybe = high_rx.recv() => {
514                        match maybe {
515                            Some(task) => task.await,
516                            None => {
517                                // High lane closed: drain remaining lanes
518                                // before exiting so already-queued work runs.
519                                while let Some(task) = normal_rx.recv().await {
520                                    task.await;
521                                }
522                                while let Some(task) = low_rx.recv().await {
523                                    task.await;
524                                }
525                                break;
526                            }
527                        }
528                    }
529                    maybe = normal_rx.recv() => {
530                        if let Some(task) = maybe {
531                            task.await;
532                        }
533                    }
534                    maybe = low_rx.recv() => {
535                        if let Some(task) = maybe {
536                            task.await;
537                        }
538                    }
539                }
540            }
541        });
542
543        Arc::new(Self {
544            handle,
545            registry: Arc::new(TaskRegistry::new()),
546            next_id: AtomicU64::new(1),
547            senders: Mutex::new(Some([high_tx, normal_tx, low_tx])),
548        })
549    }
550
551    pub(crate) fn panel(self: &Arc<Self>) -> VibeTaskPanel {
552        VibeTaskPanel {
553            registry: Arc::clone(&self.registry),
554        }
555    }
556
557    fn make_task(
558        &self,
559        name: String,
560        kind: VibeTaskKind,
561        priority: VibeTaskPriority,
562    ) -> Result<Arc<TaskInner>, VibeEngineError> {
563        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
564        let inner = Arc::new(TaskInner {
565            id,
566            name,
567            kind,
568            priority,
569            token: VibeCancellationToken::new(),
570            state: Mutex::new(VibeTaskState::Pending),
571            timestamps: Mutex::new(TaskTimestamps {
572                created_at_ms: platform::now(),
573                started_at_ms: None,
574                finished_at_ms: None,
575            }),
576            finished: Notify::new(),
577        });
578        self.registry.insert(Arc::clone(&inner))?;
579        Ok(inner)
580    }
581
582    fn priority_sender(
583        &self,
584        priority: VibeTaskPriority,
585    ) -> Result<Sender<VibeEngineTask>, VibeEngineError> {
586        let guard = self
587            .senders
588            .lock()
589            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?;
590        let array = guard
591            .as_ref()
592            .ok_or_else(|| VibeEngineError::from_error_code(VibeEngineErrorCode::PostError))?;
593        Ok(array[priority as usize].clone())
594    }
595
596    /// Send a future to the priority dispatcher and track it in the registry.
597    pub(crate) fn post_with_priority<F>(
598        &self,
599        name: impl Into<String>,
600        priority: VibeTaskPriority,
601        future: F,
602    ) -> Result<VibeTaskHandle, VibeEngineError>
603    where
604        F: Future<Output = ()> + Send + 'static,
605    {
606        let task = self.make_task(name.into(), VibeTaskKind::Once, priority)?;
607        let registry = Arc::clone(&self.registry);
608        let task_for_run = Arc::clone(&task);
609        let token = task.token.clone();
610        let wrapped: VibeEngineTask = Box::pin(async move {
611            if token.is_cancelled() {
612                if let Err(error) = task_for_run.finish(VibeTaskState::Cancelled) {
613                    log_e!(
614                        "scheduler.post_with_priority",
615                        DESC,
616                        format!("finish failed: {error}")
617                    );
618                }
619                if let Err(error) = registry.remove(task_for_run.id) {
620                    log_e!(
621                        "scheduler.post_with_priority",
622                        DESC,
623                        format!("registry remove failed: {error}")
624                    );
625                }
626                return;
627            }
628            if let Err(error) = task_for_run.mark_started() {
629                log_e!(
630                    "scheduler.post_with_priority",
631                    DESC,
632                    format!("mark started failed: {error}")
633                );
634                return;
635            }
636            let final_state = run_user_future(Box::pin(future), &token).await;
637            if let Err(error) = task_for_run.finish(final_state) {
638                log_e!(
639                    "scheduler.post_with_priority",
640                    DESC,
641                    format!("finish failed: {error}")
642                );
643            }
644            if let Err(error) = registry.remove(task_for_run.id) {
645                log_e!(
646                    "scheduler.post_with_priority",
647                    DESC,
648                    format!("registry remove failed: {error}")
649                );
650            }
651        });
652
653        let sender = self.priority_sender(priority)?;
654        let task_for_send = Arc::clone(&task);
655        let registry_for_send = Arc::clone(&self.registry);
656        // Non-blocking send so callers running on the tokio runtime thread
657        // never deadlock. The capacity is configured high enough that this
658        // path should virtually never fail under normal load.
659        if let Err(err) = sender.try_send(wrapped) {
660            log_e!(
661                "scheduler.post_with_priority",
662                DESC,
663                format!("send to priority lane {} failed: {err}", priority.as_str())
664            );
665            if let Err(error) = task_for_send.finish(VibeTaskState::Failed) {
666                log_e!(
667                    "scheduler.post_with_priority",
668                    DESC,
669                    format!("finish failed: {error}")
670                );
671            }
672            if let Err(error) = registry_for_send.remove(task_for_send.id) {
673                log_e!(
674                    "scheduler.post_with_priority",
675                    DESC,
676                    format!("registry remove failed: {error}")
677                );
678            }
679            return Err(VibeEngineError::from_error_code(
680                VibeEngineErrorCode::PostError,
681            ));
682        }
683        Ok(VibeTaskHandle { inner: task })
684    }
685
686    /// One-shot delayed task. The future is built lazily after the delay so
687    /// callers do not pay the cost of the work until it is actually due.
688    pub(crate) fn schedule_after<F, Fut>(
689        &self,
690        name: impl Into<String>,
691        delay: Duration,
692        builder: F,
693    ) -> Result<VibeTaskHandle, VibeEngineError>
694    where
695        F: FnOnce(VibeCancellationToken) -> Fut + Send + 'static,
696        Fut: Future<Output = ()> + Send + 'static,
697    {
698        let task = self.make_task(name.into(), VibeTaskKind::Delayed, VibeTaskPriority::Normal)?;
699        let registry = Arc::clone(&self.registry);
700        let task_for_run = Arc::clone(&task);
701        let token = task.token.clone();
702        self.handle.spawn(async move {
703            tokio::select! {
704                _ = token.cancelled() => {
705                    if let Err(error) = task_for_run.finish(VibeTaskState::Cancelled) {
706                        log_e!("scheduler.schedule_after", DESC, format!("finish failed: {error}"));
707                    }
708                    if let Err(error) = registry.remove(task_for_run.id) {
709                        log_e!("scheduler.schedule_after", DESC, format!("registry remove failed: {error}"));
710                    }
711                    return;
712                }
713                _ = tokio::time::sleep(delay) => {}
714            }
715            if token.is_cancelled() {
716                if let Err(error) = task_for_run.finish(VibeTaskState::Cancelled) {
717                    log_e!("scheduler.schedule_after", DESC, format!("finish failed: {error}"));
718                }
719                if let Err(error) = registry.remove(task_for_run.id) {
720                    log_e!("scheduler.schedule_after", DESC, format!("registry remove failed: {error}"));
721                }
722                return;
723            }
724            if let Err(error) = task_for_run.mark_started() {
725                log_e!("scheduler.schedule_after", DESC, format!("mark started failed: {error}"));
726                return;
727            }
728            let fut = Box::pin(builder(token.clone()));
729            let final_state = run_user_future(fut, &token).await;
730            if let Err(error) = task_for_run.finish(final_state) {
731                log_e!("scheduler.schedule_after", DESC, format!("finish failed: {error}"));
732            }
733            if let Err(error) = registry.remove(task_for_run.id) {
734                log_e!("scheduler.schedule_after", DESC, format!("registry remove failed: {error}"));
735            }
736        });
737        Ok(VibeTaskHandle { inner: task })
738    }
739
740    /// Periodic task. The builder is invoked every `period` until cancelled.
741    /// The first invocation occurs after waiting one period; this matches
742    /// `tokio::time::interval` semantics with `MissedTickBehavior::Delay`.
743    pub(crate) fn schedule_every<F, Fut>(
744        &self,
745        name: impl Into<String>,
746        period: Duration,
747        mut builder: F,
748    ) -> Result<VibeTaskHandle, VibeEngineError>
749    where
750        F: FnMut(VibeCancellationToken) -> Fut + Send + 'static,
751        Fut: Future<Output = ()> + Send + 'static,
752    {
753        let task = self.make_task(
754            name.into(),
755            VibeTaskKind::Periodic,
756            VibeTaskPriority::Normal,
757        )?;
758        let registry = Arc::clone(&self.registry);
759        let task_for_run = Arc::clone(&task);
760        let token = task.token.clone();
761        self.handle.spawn(async move {
762            if let Err(error) = task_for_run.mark_started() {
763                log_e!(
764                    "scheduler.schedule_every",
765                    DESC,
766                    format!("mark started failed: {error}")
767                );
768                return;
769            }
770            loop {
771                tokio::select! {
772                    _ = token.cancelled() => break,
773                    _ = tokio::time::sleep(period) => {}
774                }
775                if token.is_cancelled() {
776                    break;
777                }
778                let fut = Box::pin(builder(token.clone()));
779                let state = run_user_future(fut, &token).await;
780                if !matches!(state, VibeTaskState::Completed) {
781                    // Cancellation or panic terminates the periodic loop.
782                    if let Err(error) = task_for_run.finish(state) {
783                        log_e!(
784                            "scheduler.schedule_every",
785                            DESC,
786                            format!("finish failed: {error}")
787                        );
788                    }
789                    if let Err(error) = registry.remove(task_for_run.id) {
790                        log_e!(
791                            "scheduler.schedule_every",
792                            DESC,
793                            format!("registry remove failed: {error}")
794                        );
795                    }
796                    return;
797                }
798            }
799            if let Err(error) = task_for_run.finish(VibeTaskState::Cancelled) {
800                log_e!(
801                    "scheduler.schedule_every",
802                    DESC,
803                    format!("finish failed: {error}")
804                );
805            }
806            if let Err(error) = registry.remove(task_for_run.id) {
807                log_e!(
808                    "scheduler.schedule_every",
809                    DESC,
810                    format!("registry remove failed: {error}")
811                );
812            }
813        });
814        Ok(VibeTaskHandle { inner: task })
815    }
816
817    /// Trip every tracked task and drop the priority senders so the
818    /// dispatcher exits once already-queued work drains.
819    pub(crate) fn shutdown(&self) {
820        if let Err(error) = self.registry.cancel_all() {
821            log_e!(
822                "scheduler.shutdown",
823                DESC,
824                format!("cancel all failed: {error}")
825            );
826        }
827        if let Ok(mut guard) = self.senders.lock() {
828            *guard = None;
829        }
830    }
831}
832
833async fn run_user_future(
834    fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
835    token: &VibeCancellationToken,
836) -> VibeTaskState {
837    use futures::future::FutureExt;
838    use std::panic::AssertUnwindSafe;
839    let outcome = AssertUnwindSafe(fut).catch_unwind().await;
840    match outcome {
841        Ok(()) => {
842            if token.is_cancelled() {
843                VibeTaskState::Cancelled
844            } else {
845                VibeTaskState::Completed
846            }
847        }
848        Err(payload) => {
849            let msg = if let Some(s) = payload.downcast_ref::<&str>() {
850                (*s).to_string()
851            } else if let Some(s) = payload.downcast_ref::<String>() {
852                s.clone()
853            } else {
854                "unknown panic payload".to_string()
855            };
856            log_e!(
857                "scheduler.run_user_future",
858                DESC,
859                format!("scheduled task panicked: {msg}")
860            );
861            VibeTaskState::Failed
862        }
863    }
864}
865
866#[cfg(test)]
867mod tests {
868    use super::*;
869
870    #[tokio::test]
871    async fn cancellation_token_resolves_for_concurrent_waiters() {
872        let token = VibeCancellationToken::new();
873        let t1 = token.clone();
874        let t2 = token.clone();
875        let h1 = tokio::spawn(async move { t1.cancelled().await });
876        let h2 = tokio::spawn(async move { t2.cancelled().await });
877        tokio::time::sleep(Duration::from_millis(20)).await;
878        token.cancel();
879        assert!(h1.await.is_ok());
880        assert!(h2.await.is_ok());
881        assert!(token.is_cancelled());
882    }
883}
884
885#[cfg(test)]
886mod strict_tests {
887    use super::*;
888    include!(concat!(
889        env!("CARGO_MANIFEST_DIR"),
890        "/test/unit/api/scheduler_tests.rs"
891    ));
892}