1use crate::api::engine_error::{VibeEngineError, VibeEngineErrorCode};
7use crate::api::engine_executor::VibeEngineTask;
8use crate::log::log_def::DESC;
9use crate::{log_e, platform};
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16use tokio::runtime::Handle;
17use tokio::sync::mpsc::{channel, Sender};
18use tokio::sync::Notify;
19
20fn scheduler_lock_error(context: impl Into<String>) -> VibeEngineError {
21 VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError).with_context(context)
22}
23
24#[repr(u8)]
26#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
27pub enum VibeTaskPriority {
28 High = 0,
30 Normal = 1,
32 Low = 2,
34}
35
36impl VibeTaskPriority {
37 fn as_str(self) -> &'static str {
38 match self {
39 VibeTaskPriority::High => "high",
40 VibeTaskPriority::Normal => "normal",
41 VibeTaskPriority::Low => "low",
42 }
43 }
44}
45
46#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48pub enum VibeTaskKind {
49 Once,
51 Delayed,
53 Periodic,
55}
56
57#[derive(Clone, Copy, Debug, Eq, PartialEq)]
59pub enum VibeTaskState {
60 Pending,
62 Running,
64 Completed,
66 Cancelled,
68 Failed,
70}
71
72#[derive(Clone)]
78pub struct VibeCancellationToken {
79 flag: Arc<AtomicBool>,
80 notify: Arc<Notify>,
81}
82
83impl VibeCancellationToken {
84 pub fn new() -> Self {
101 Self {
102 flag: Arc::new(AtomicBool::new(false)),
103 notify: Arc::new(Notify::new()),
104 }
105 }
106
107 pub fn cancel(&self) {
114 if !self.flag.swap(true, Ordering::AcqRel) {
115 self.notify.notify_waiters();
116 }
117 }
118
119 pub fn is_cancelled(&self) -> bool {
125 self.flag.load(Ordering::Acquire)
126 }
127
128 pub async fn cancelled(&self) {
134 loop {
135 if self.is_cancelled() {
136 return;
137 }
138 let waiter = self.notify.notified();
139 if self.is_cancelled() {
140 return;
141 }
142 waiter.await;
143 if self.is_cancelled() {
144 return;
145 }
146 }
147 }
148}
149
150impl Default for VibeCancellationToken {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156#[derive(Clone, Debug)]
157struct TaskTimestamps {
158 created_at_ms: i64,
159 started_at_ms: Option<i64>,
160 finished_at_ms: Option<i64>,
161}
162
163struct TaskInner {
164 id: u64,
165 name: String,
166 kind: VibeTaskKind,
167 priority: VibeTaskPriority,
168 token: VibeCancellationToken,
169 state: Mutex<VibeTaskState>,
170 timestamps: Mutex<TaskTimestamps>,
171 finished: Notify,
172}
173
174impl TaskInner {
175 fn snapshot(&self) -> Result<VibeTaskInfo, VibeEngineError> {
176 let state = *self
177 .state
178 .lock()
179 .map_err(|_| scheduler_lock_error("task state lock poisoned"))?;
180 let ts = self
181 .timestamps
182 .lock()
183 .map_err(|_| scheduler_lock_error("task timestamps lock poisoned"))?
184 .clone();
185 Ok(VibeTaskInfo {
186 id: self.id,
187 name: self.name.clone(),
188 kind: self.kind,
189 priority: self.priority,
190 state,
191 created_at_ms: ts.created_at_ms,
192 started_at_ms: ts.started_at_ms,
193 finished_at_ms: ts.finished_at_ms,
194 })
195 }
196
197 fn set_state(&self, new_state: VibeTaskState) -> Result<(), VibeEngineError> {
198 let mut guard = self
199 .state
200 .lock()
201 .map_err(|_| scheduler_lock_error("task state lock poisoned"))?;
202 if matches!(
203 *guard,
204 VibeTaskState::Completed | VibeTaskState::Cancelled | VibeTaskState::Failed
205 ) {
206 return Ok(());
207 }
208 *guard = new_state;
209 Ok(())
210 }
211
212 fn mark_started(&self) -> Result<(), VibeEngineError> {
213 self.set_state(VibeTaskState::Running)?;
214 let mut ts = self
215 .timestamps
216 .lock()
217 .map_err(|_| scheduler_lock_error("task timestamps lock poisoned"))?;
218 if ts.started_at_ms.is_none() {
219 ts.started_at_ms = Some(platform::now());
220 }
221 Ok(())
222 }
223
224 fn finish(&self, final_state: VibeTaskState) -> Result<(), VibeEngineError> {
225 {
226 let mut guard = self
227 .state
228 .lock()
229 .map_err(|_| scheduler_lock_error("task state lock poisoned"))?;
230 *guard = final_state;
231 }
232 {
233 let mut ts = self
234 .timestamps
235 .lock()
236 .map_err(|_| scheduler_lock_error("task timestamps lock poisoned"))?;
237 if ts.finished_at_ms.is_none() {
238 ts.finished_at_ms = Some(platform::now());
239 }
240 }
241 self.finished.notify_waiters();
242 Ok(())
243 }
244}
245
246#[derive(Clone, Debug)]
248pub struct VibeTaskInfo {
249 pub id: u64,
251 pub name: String,
253 pub kind: VibeTaskKind,
255 pub priority: VibeTaskPriority,
257 pub state: VibeTaskState,
259 pub created_at_ms: i64,
261 pub started_at_ms: Option<i64>,
263 pub finished_at_ms: Option<i64>,
265}
266
267#[derive(Clone)]
271pub struct VibeTaskHandle {
272 inner: Arc<TaskInner>,
273}
274
275impl VibeTaskHandle {
276 pub fn id(&self) -> u64 {
282 self.inner.id
283 }
284
285 pub fn name(&self) -> &str {
291 &self.inner.name
292 }
293
294 pub fn kind(&self) -> VibeTaskKind {
300 self.inner.kind
301 }
302
303 pub fn priority(&self) -> VibeTaskPriority {
309 self.inner.priority
310 }
311
312 pub fn state(&self) -> Result<VibeTaskState, VibeEngineError> {
318 self.inner
319 .state
320 .lock()
321 .map(|guard| *guard)
322 .map_err(|_| scheduler_lock_error("task state lock poisoned"))
323 }
324
325 pub fn token(&self) -> VibeCancellationToken {
331 self.inner.token.clone()
332 }
333
334 pub fn info(&self) -> Result<VibeTaskInfo, VibeEngineError> {
340 self.inner.snapshot()
341 }
342
343 pub fn cancel(&self) {
352 self.inner.token.cancel();
353 }
354
355 pub fn is_finished(&self) -> Result<bool, VibeEngineError> {
361 Ok(matches!(
362 self.state()?,
363 VibeTaskState::Completed | VibeTaskState::Cancelled | VibeTaskState::Failed
364 ))
365 }
366
367 pub async fn join(&self) -> Result<(), VibeEngineError> {
370 loop {
371 match self.state()? {
372 VibeTaskState::Completed => return Ok(()),
373 VibeTaskState::Cancelled => {
374 return Err(VibeEngineError::from_error_code(
375 VibeEngineErrorCode::Cancelled,
376 ));
377 }
378 VibeTaskState::Failed => {
379 return Err(VibeEngineError::from_error_code(
380 VibeEngineErrorCode::InternalError,
381 ));
382 }
383 VibeTaskState::Pending | VibeTaskState::Running => {}
384 }
385 let notified = self.inner.finished.notified();
386 if self.is_finished()? {
387 continue;
388 }
389 notified.await;
390 }
391 }
392}
393
394#[derive(Clone)]
396pub struct VibeTaskPanel {
397 registry: Arc<TaskRegistry>,
398}
399
400impl VibeTaskPanel {
401 pub fn list(&self) -> Result<Vec<VibeTaskInfo>, VibeEngineError> {
420 self.registry.snapshot()
421 }
422
423 pub fn count(&self) -> Result<usize, VibeEngineError> {
429 self.registry.len()
430 }
431}
432
433struct TaskRegistry {
434 tasks: Mutex<HashMap<u64, Arc<TaskInner>>>,
435}
436
437impl TaskRegistry {
438 fn new() -> Self {
439 Self {
440 tasks: Mutex::new(HashMap::new()),
441 }
442 }
443
444 fn insert(&self, task: Arc<TaskInner>) -> Result<(), VibeEngineError> {
445 self.tasks
446 .lock()
447 .map_err(|_| scheduler_lock_error("task registry lock poisoned"))?
448 .insert(task.id, task);
449 Ok(())
450 }
451
452 fn remove(&self, id: u64) -> Result<(), VibeEngineError> {
453 self.tasks
454 .lock()
455 .map_err(|_| scheduler_lock_error("task registry lock poisoned"))?
456 .remove(&id);
457 Ok(())
458 }
459
460 fn snapshot(&self) -> Result<Vec<VibeTaskInfo>, VibeEngineError> {
461 let guard = self
462 .tasks
463 .lock()
464 .map_err(|_| scheduler_lock_error("task registry lock poisoned"))?;
465 guard.values().map(|t| t.snapshot()).collect()
466 }
467
468 fn len(&self) -> Result<usize, VibeEngineError> {
469 Ok(self
470 .tasks
471 .lock()
472 .map_err(|_| scheduler_lock_error("task registry lock poisoned"))?
473 .len())
474 }
475
476 fn cancel_all(&self) -> Result<Vec<Arc<TaskInner>>, VibeEngineError> {
477 let guard = self
478 .tasks
479 .lock()
480 .map_err(|_| scheduler_lock_error("task registry lock poisoned"))?;
481 let snapshot: Vec<Arc<TaskInner>> = guard.values().cloned().collect();
482 for task in &snapshot {
483 task.token.cancel();
484 }
485 Ok(snapshot)
486 }
487}
488
489pub(crate) struct VibeTaskScheduler {
491 handle: Handle,
492 registry: Arc<TaskRegistry>,
493 next_id: AtomicU64,
494 senders: Mutex<Option<[Sender<VibeEngineTask>; 3]>>,
495}
496
497impl VibeTaskScheduler {
498 pub(crate) fn new(handle: Handle, capacity: usize) -> Arc<Self> {
499 let cap = capacity.max(1);
500 let (high_tx, mut high_rx) = channel::<VibeEngineTask>(cap);
501 let (normal_tx, mut normal_rx) = channel::<VibeEngineTask>(cap);
502 let (low_tx, mut low_rx) = channel::<VibeEngineTask>(cap);
503
504 handle.spawn(async move {
510 loop {
511 tokio::select! {
512 biased;
513 maybe = high_rx.recv() => {
514 match maybe {
515 Some(task) => task.await,
516 None => {
517 while let Some(task) = normal_rx.recv().await {
520 task.await;
521 }
522 while let Some(task) = low_rx.recv().await {
523 task.await;
524 }
525 break;
526 }
527 }
528 }
529 maybe = normal_rx.recv() => {
530 if let Some(task) = maybe {
531 task.await;
532 }
533 }
534 maybe = low_rx.recv() => {
535 if let Some(task) = maybe {
536 task.await;
537 }
538 }
539 }
540 }
541 });
542
543 Arc::new(Self {
544 handle,
545 registry: Arc::new(TaskRegistry::new()),
546 next_id: AtomicU64::new(1),
547 senders: Mutex::new(Some([high_tx, normal_tx, low_tx])),
548 })
549 }
550
551 pub(crate) fn panel(self: &Arc<Self>) -> VibeTaskPanel {
552 VibeTaskPanel {
553 registry: Arc::clone(&self.registry),
554 }
555 }
556
557 fn make_task(
558 &self,
559 name: String,
560 kind: VibeTaskKind,
561 priority: VibeTaskPriority,
562 ) -> Result<Arc<TaskInner>, VibeEngineError> {
563 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
564 let inner = Arc::new(TaskInner {
565 id,
566 name,
567 kind,
568 priority,
569 token: VibeCancellationToken::new(),
570 state: Mutex::new(VibeTaskState::Pending),
571 timestamps: Mutex::new(TaskTimestamps {
572 created_at_ms: platform::now(),
573 started_at_ms: None,
574 finished_at_ms: None,
575 }),
576 finished: Notify::new(),
577 });
578 self.registry.insert(Arc::clone(&inner))?;
579 Ok(inner)
580 }
581
582 fn priority_sender(
583 &self,
584 priority: VibeTaskPriority,
585 ) -> Result<Sender<VibeEngineTask>, VibeEngineError> {
586 let guard = self
587 .senders
588 .lock()
589 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?;
590 let array = guard
591 .as_ref()
592 .ok_or_else(|| VibeEngineError::from_error_code(VibeEngineErrorCode::PostError))?;
593 Ok(array[priority as usize].clone())
594 }
595
596 pub(crate) fn post_with_priority<F>(
598 &self,
599 name: impl Into<String>,
600 priority: VibeTaskPriority,
601 future: F,
602 ) -> Result<VibeTaskHandle, VibeEngineError>
603 where
604 F: Future<Output = ()> + Send + 'static,
605 {
606 let task = self.make_task(name.into(), VibeTaskKind::Once, priority)?;
607 let registry = Arc::clone(&self.registry);
608 let task_for_run = Arc::clone(&task);
609 let token = task.token.clone();
610 let wrapped: VibeEngineTask = Box::pin(async move {
611 if token.is_cancelled() {
612 if let Err(error) = task_for_run.finish(VibeTaskState::Cancelled) {
613 log_e!(
614 "scheduler.post_with_priority",
615 DESC,
616 format!("finish failed: {error}")
617 );
618 }
619 if let Err(error) = registry.remove(task_for_run.id) {
620 log_e!(
621 "scheduler.post_with_priority",
622 DESC,
623 format!("registry remove failed: {error}")
624 );
625 }
626 return;
627 }
628 if let Err(error) = task_for_run.mark_started() {
629 log_e!(
630 "scheduler.post_with_priority",
631 DESC,
632 format!("mark started failed: {error}")
633 );
634 return;
635 }
636 let final_state = run_user_future(Box::pin(future), &token).await;
637 if let Err(error) = task_for_run.finish(final_state) {
638 log_e!(
639 "scheduler.post_with_priority",
640 DESC,
641 format!("finish failed: {error}")
642 );
643 }
644 if let Err(error) = registry.remove(task_for_run.id) {
645 log_e!(
646 "scheduler.post_with_priority",
647 DESC,
648 format!("registry remove failed: {error}")
649 );
650 }
651 });
652
653 let sender = self.priority_sender(priority)?;
654 let task_for_send = Arc::clone(&task);
655 let registry_for_send = Arc::clone(&self.registry);
656 if let Err(err) = sender.try_send(wrapped) {
660 log_e!(
661 "scheduler.post_with_priority",
662 DESC,
663 format!("send to priority lane {} failed: {err}", priority.as_str())
664 );
665 if let Err(error) = task_for_send.finish(VibeTaskState::Failed) {
666 log_e!(
667 "scheduler.post_with_priority",
668 DESC,
669 format!("finish failed: {error}")
670 );
671 }
672 if let Err(error) = registry_for_send.remove(task_for_send.id) {
673 log_e!(
674 "scheduler.post_with_priority",
675 DESC,
676 format!("registry remove failed: {error}")
677 );
678 }
679 return Err(VibeEngineError::from_error_code(
680 VibeEngineErrorCode::PostError,
681 ));
682 }
683 Ok(VibeTaskHandle { inner: task })
684 }
685
686 pub(crate) fn schedule_after<F, Fut>(
689 &self,
690 name: impl Into<String>,
691 delay: Duration,
692 builder: F,
693 ) -> Result<VibeTaskHandle, VibeEngineError>
694 where
695 F: FnOnce(VibeCancellationToken) -> Fut + Send + 'static,
696 Fut: Future<Output = ()> + Send + 'static,
697 {
698 let task = self.make_task(name.into(), VibeTaskKind::Delayed, VibeTaskPriority::Normal)?;
699 let registry = Arc::clone(&self.registry);
700 let task_for_run = Arc::clone(&task);
701 let token = task.token.clone();
702 self.handle.spawn(async move {
703 tokio::select! {
704 _ = token.cancelled() => {
705 if let Err(error) = task_for_run.finish(VibeTaskState::Cancelled) {
706 log_e!("scheduler.schedule_after", DESC, format!("finish failed: {error}"));
707 }
708 if let Err(error) = registry.remove(task_for_run.id) {
709 log_e!("scheduler.schedule_after", DESC, format!("registry remove failed: {error}"));
710 }
711 return;
712 }
713 _ = tokio::time::sleep(delay) => {}
714 }
715 if token.is_cancelled() {
716 if let Err(error) = task_for_run.finish(VibeTaskState::Cancelled) {
717 log_e!("scheduler.schedule_after", DESC, format!("finish failed: {error}"));
718 }
719 if let Err(error) = registry.remove(task_for_run.id) {
720 log_e!("scheduler.schedule_after", DESC, format!("registry remove failed: {error}"));
721 }
722 return;
723 }
724 if let Err(error) = task_for_run.mark_started() {
725 log_e!("scheduler.schedule_after", DESC, format!("mark started failed: {error}"));
726 return;
727 }
728 let fut = Box::pin(builder(token.clone()));
729 let final_state = run_user_future(fut, &token).await;
730 if let Err(error) = task_for_run.finish(final_state) {
731 log_e!("scheduler.schedule_after", DESC, format!("finish failed: {error}"));
732 }
733 if let Err(error) = registry.remove(task_for_run.id) {
734 log_e!("scheduler.schedule_after", DESC, format!("registry remove failed: {error}"));
735 }
736 });
737 Ok(VibeTaskHandle { inner: task })
738 }
739
740 pub(crate) fn schedule_every<F, Fut>(
744 &self,
745 name: impl Into<String>,
746 period: Duration,
747 mut builder: F,
748 ) -> Result<VibeTaskHandle, VibeEngineError>
749 where
750 F: FnMut(VibeCancellationToken) -> Fut + Send + 'static,
751 Fut: Future<Output = ()> + Send + 'static,
752 {
753 let task = self.make_task(
754 name.into(),
755 VibeTaskKind::Periodic,
756 VibeTaskPriority::Normal,
757 )?;
758 let registry = Arc::clone(&self.registry);
759 let task_for_run = Arc::clone(&task);
760 let token = task.token.clone();
761 self.handle.spawn(async move {
762 if let Err(error) = task_for_run.mark_started() {
763 log_e!(
764 "scheduler.schedule_every",
765 DESC,
766 format!("mark started failed: {error}")
767 );
768 return;
769 }
770 loop {
771 tokio::select! {
772 _ = token.cancelled() => break,
773 _ = tokio::time::sleep(period) => {}
774 }
775 if token.is_cancelled() {
776 break;
777 }
778 let fut = Box::pin(builder(token.clone()));
779 let state = run_user_future(fut, &token).await;
780 if !matches!(state, VibeTaskState::Completed) {
781 if let Err(error) = task_for_run.finish(state) {
783 log_e!(
784 "scheduler.schedule_every",
785 DESC,
786 format!("finish failed: {error}")
787 );
788 }
789 if let Err(error) = registry.remove(task_for_run.id) {
790 log_e!(
791 "scheduler.schedule_every",
792 DESC,
793 format!("registry remove failed: {error}")
794 );
795 }
796 return;
797 }
798 }
799 if let Err(error) = task_for_run.finish(VibeTaskState::Cancelled) {
800 log_e!(
801 "scheduler.schedule_every",
802 DESC,
803 format!("finish failed: {error}")
804 );
805 }
806 if let Err(error) = registry.remove(task_for_run.id) {
807 log_e!(
808 "scheduler.schedule_every",
809 DESC,
810 format!("registry remove failed: {error}")
811 );
812 }
813 });
814 Ok(VibeTaskHandle { inner: task })
815 }
816
817 pub(crate) fn shutdown(&self) {
820 if let Err(error) = self.registry.cancel_all() {
821 log_e!(
822 "scheduler.shutdown",
823 DESC,
824 format!("cancel all failed: {error}")
825 );
826 }
827 if let Ok(mut guard) = self.senders.lock() {
828 *guard = None;
829 }
830 }
831}
832
833async fn run_user_future(
834 fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
835 token: &VibeCancellationToken,
836) -> VibeTaskState {
837 use futures::future::FutureExt;
838 use std::panic::AssertUnwindSafe;
839 let outcome = AssertUnwindSafe(fut).catch_unwind().await;
840 match outcome {
841 Ok(()) => {
842 if token.is_cancelled() {
843 VibeTaskState::Cancelled
844 } else {
845 VibeTaskState::Completed
846 }
847 }
848 Err(payload) => {
849 let msg = if let Some(s) = payload.downcast_ref::<&str>() {
850 (*s).to_string()
851 } else if let Some(s) = payload.downcast_ref::<String>() {
852 s.clone()
853 } else {
854 "unknown panic payload".to_string()
855 };
856 log_e!(
857 "scheduler.run_user_future",
858 DESC,
859 format!("scheduled task panicked: {msg}")
860 );
861 VibeTaskState::Failed
862 }
863 }
864}
865
866#[cfg(test)]
867mod tests {
868 use super::*;
869
870 #[tokio::test]
871 async fn cancellation_token_resolves_for_concurrent_waiters() {
872 let token = VibeCancellationToken::new();
873 let t1 = token.clone();
874 let t2 = token.clone();
875 let h1 = tokio::spawn(async move { t1.cancelled().await });
876 let h2 = tokio::spawn(async move { t2.cancelled().await });
877 tokio::time::sleep(Duration::from_millis(20)).await;
878 token.cancel();
879 assert!(h1.await.is_ok());
880 assert!(h2.await.is_ok());
881 assert!(token.is_cancelled());
882 }
883}
884
885#[cfg(test)]
886mod strict_tests {
887 use super::*;
888 include!(concat!(
889 env!("CARGO_MANIFEST_DIR"),
890 "/test/unit/api/scheduler_tests.rs"
891 ));
892}