Skip to main content

juggle/utils/
load.rs

1use alloc::rc::Rc;
2use core::fmt::{Debug, Formatter};
3use core::future::Future;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use crate::utils::{TimerClock, TimingGroup};
7use crate::round::Ucw;
8
9/// Helper for equally dividing time slots across multiple tasks. Implements `Future`.
10///
11/// This struct can be used as a wrapper for tasks to ensure they have more-less equal amount of time
12/// per slot to execute. For more low-level control you can use [`TimingGroup`](struct.TimingGroup.html).
13///
14/// Each task has number of time slots assigned to it. Time slots divides time designated for this group
15/// into parts for each task. Some task can therefore have assigned more time than the others, e.g if
16/// you have 2 tasks, first with 1 time slot and second with 2, then let them run for 3 seconds, first
17/// task will be running 1 second from this time and second task - 2 seconds.
18///
19/// You can specify custom clock for measuring time in this group by implementing
20/// [`TimerClock`](trait.TimerClock.html) trait and passing it to [`with`](#method.with) method.
21///
22/// # Examples
23/// ```
24/// # extern crate std;
25/// use juggle::*;
26/// use juggle::utils::*;
27/// use std::time::{Duration, Instant};
28/// # use std::thread::sleep;
29/// # struct StdTimerClock;
30/// # impl TimerClock for StdTimerClock {
31/// #   type Duration = Duration;
32/// #   type Instant = std::time::Instant;
33/// #   fn start(&self) -> Self::Instant { Self::Instant::now() }
34/// #   fn stop(&self, start: Self::Instant) -> Self::Duration { Self::Instant::now() - start }
35/// # }
36/// # fn do_some_work(){ sleep(Duration::from_millis(1))}
37/// # fn do_more_demanding_work(){ sleep(Duration::from_millis(1))}
38///
39/// async fn task_single_time(){
40///     loop{
41///         do_some_work();
42///         yield_once!();
43///     }
44/// }
45/// async fn task_double_time(){
46///     loop{
47///         do_more_demanding_work();
48///         yield_once!();
49///     }
50/// }
51///
52/// let wheel = Wheel::new();
53/// // create group with one task in it
54/// let single = LoadBalance::with(StdTimerClock,1,task_single_time());
55/// // add other task to group with 2 time slots
56/// let double = single.insert(2,task_double_time());
57/// // spawn tasks in Wheel
58/// let ids = wheel.handle().spawn(SpawnParams::default(),single).unwrap();
59/// let idd = wheel.handle().spawn(SpawnParams::default(),double).unwrap();
60/// // add control to cancel tasks after 1 second.
61/// let handle = wheel.handle().clone();
62/// wheel.handle().spawn(SpawnParams::default(),async move {
63///     let start = Instant::now();
64///     yield_while!(start.elapsed() < Duration::from_millis(1000));
65///     handle.cancel(ids);
66///     handle.cancel(idd);
67/// }).unwrap();
68/// //run scheduler
69/// smol::block_on(wheel).unwrap();
70/// ```
71pub struct LoadBalance<F: Future, C: TimerClock> {
72    record: Registered<C>,
73    future: F,
74}
75
76struct Registered<C: TimerClock> {
77    index: usize,
78    group: Rc<(Ucw<TimingGroup<C::Duration>>, C)>, //only struct with drop (see: unregister_and_get)
79}
80
81impl<C: TimerClock> Registered<C> {
82    #[inline(always)]
83    fn do_unregister(&self){ self.group.0.borrow_mut().remove(self.index); }
84    #[inline]
85    fn unregister_and_get(self) -> Rc<(Ucw<TimingGroup<C::Duration>>, C)> {
86        //equivalent to running drop
87        self.do_unregister();
88        //SAFETY: we just run destructor code, now perform moving value out, and suppress
89        //real destructor.
90        unsafe {
91            let rc = core::ptr::read(&self.group as *const _);//take by force! (only dropable struct)
92            core::mem::forget(self); //you'd better not drop it! (if not this line, we would have double drop)
93            rc
94        }
95    }
96}
97
98impl<C: TimerClock> Drop for Registered<C> {
99    fn drop(&mut self) {
100        self.do_unregister();
101    }
102}
103
104impl<F: Future, C: TimerClock> LoadBalance<F, C> {
105    /// Create new load balancing group with one future in it. Assigns specific
106    /// number of time slots to future and clock used as measuring time source for this group.
107    /// For usage with `std` library feature, you can use [`StdTimerClock`](struct.StdTimerClock.html).
108    pub fn with(clock: C, prop: u16, future: F) -> Self {
109        let mut group = TimingGroup::new();
110        let key = group.insert(prop);
111        Self {
112            record: Registered {
113                index: key,
114                group: Rc::new((Ucw::new(group), clock)),
115            },
116            future,
117        }
118    }
119    /// Insert future to this load balancing group with given number of time slots assigned. Returned wrapper
120    /// also belongs to the same group.
121    pub fn insert<G>(&self, prop: u16, future: G) -> LoadBalance<G, C> where G: Future {
122        let index = self.record.group.0.borrow_mut().insert(prop);
123        LoadBalance {
124            record: Registered {
125                index,
126                group: self.record.group.clone(),//clone rc
127            },
128            future,
129        }
130    }
131    /// Returns number of tasks registered in this group.
132    pub fn count(&self) -> usize { self.record.group.0.borrow().count() }
133
134    /// Remove this wrapper from load balancing group, if this task is last then `Some` is returned
135    /// with ownership of clock. Otherwise returns `None`.
136    pub fn remove(self) -> Option<C> {
137        let group = self.record.unregister_and_get();
138        Rc::try_unwrap(group).ok().map(|p| p.1)
139    }
140}
141
142impl<F: Future, C: TimerClock> Future for LoadBalance<F, C> {
143    type Output = F::Output;
144    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
145        if !self.record.group.0.borrow().can_execute(self.record.index) {
146            cx.waker().wake_by_ref();
147            return Poll::Pending;
148        }
149
150        let start = self.record.group.1.start(); //start measure
151
152        //SAFETY: we just map pinned mutable references from inside of a struct
153        let future = unsafe{ self.as_mut().map_unchecked_mut(|v|&mut v.future) };
154
155        let res = future.poll(cx);
156        let dur = self.record.group.1.stop(start); //end measure
157        self.record.group.0.borrow_mut().update_duration(self.record.index, dur);
158        return res;
159    }
160}
161
162impl<F: Future + Debug, C: TimerClock + Debug> Debug for LoadBalance<F, C> {
163    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
164        f.debug_struct("LoadBalance").field("future", &self.future) //pinned future by shared ref
165            .field("slots", &self.record.group.0.borrow().get_slot_count(self.record.index).unwrap())
166            .field("clock", &self.record.group.1).finish()
167    }
168}
169