juggle/utils/load.rs
1use alloc::rc::Rc;
2use core::fmt::{Debug, Formatter};
3use core::future::Future;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use crate::utils::{TimerClock, TimingGroup};
7use crate::round::Ucw;
8
9/// Helper for equally dividing time slots across multiple tasks. Implements `Future`.
10///
11/// This struct can be used as a wrapper for tasks to ensure they have more-less equal amount of time
12/// per slot to execute. For more low-level control you can use [`TimingGroup`](struct.TimingGroup.html).
13///
14/// Each task has number of time slots assigned to it. Time slots divides time designated for this group
15/// into parts for each task. Some task can therefore have assigned more time than the others, e.g if
16/// you have 2 tasks, first with 1 time slot and second with 2, then let them run for 3 seconds, first
17/// task will be running 1 second from this time and second task - 2 seconds.
18///
19/// You can specify custom clock for measuring time in this group by implementing
20/// [`TimerClock`](trait.TimerClock.html) trait and passing it to [`with`](#method.with) method.
21///
22/// # Examples
23/// ```
24/// # extern crate std;
25/// use juggle::*;
26/// use juggle::utils::*;
27/// use std::time::{Duration, Instant};
28/// # use std::thread::sleep;
29/// # struct StdTimerClock;
30/// # impl TimerClock for StdTimerClock {
31/// # type Duration = Duration;
32/// # type Instant = std::time::Instant;
33/// # fn start(&self) -> Self::Instant { Self::Instant::now() }
34/// # fn stop(&self, start: Self::Instant) -> Self::Duration { Self::Instant::now() - start }
35/// # }
36/// # fn do_some_work(){ sleep(Duration::from_millis(1))}
37/// # fn do_more_demanding_work(){ sleep(Duration::from_millis(1))}
38///
39/// async fn task_single_time(){
40/// loop{
41/// do_some_work();
42/// yield_once!();
43/// }
44/// }
45/// async fn task_double_time(){
46/// loop{
47/// do_more_demanding_work();
48/// yield_once!();
49/// }
50/// }
51///
52/// let wheel = Wheel::new();
53/// // create group with one task in it
54/// let single = LoadBalance::with(StdTimerClock,1,task_single_time());
55/// // add other task to group with 2 time slots
56/// let double = single.insert(2,task_double_time());
57/// // spawn tasks in Wheel
58/// let ids = wheel.handle().spawn(SpawnParams::default(),single).unwrap();
59/// let idd = wheel.handle().spawn(SpawnParams::default(),double).unwrap();
60/// // add control to cancel tasks after 1 second.
61/// let handle = wheel.handle().clone();
62/// wheel.handle().spawn(SpawnParams::default(),async move {
63/// let start = Instant::now();
64/// yield_while!(start.elapsed() < Duration::from_millis(1000));
65/// handle.cancel(ids);
66/// handle.cancel(idd);
67/// }).unwrap();
68/// //run scheduler
69/// smol::block_on(wheel).unwrap();
70/// ```
71pub struct LoadBalance<F: Future, C: TimerClock> {
72 record: Registered<C>,
73 future: F,
74}
75
76struct Registered<C: TimerClock> {
77 index: usize,
78 group: Rc<(Ucw<TimingGroup<C::Duration>>, C)>, //only struct with drop (see: unregister_and_get)
79}
80
81impl<C: TimerClock> Registered<C> {
82 #[inline(always)]
83 fn do_unregister(&self){ self.group.0.borrow_mut().remove(self.index); }
84 #[inline]
85 fn unregister_and_get(self) -> Rc<(Ucw<TimingGroup<C::Duration>>, C)> {
86 //equivalent to running drop
87 self.do_unregister();
88 //SAFETY: we just run destructor code, now perform moving value out, and suppress
89 //real destructor.
90 unsafe {
91 let rc = core::ptr::read(&self.group as *const _);//take by force! (only dropable struct)
92 core::mem::forget(self); //you'd better not drop it! (if not this line, we would have double drop)
93 rc
94 }
95 }
96}
97
98impl<C: TimerClock> Drop for Registered<C> {
99 fn drop(&mut self) {
100 self.do_unregister();
101 }
102}
103
104impl<F: Future, C: TimerClock> LoadBalance<F, C> {
105 /// Create new load balancing group with one future in it. Assigns specific
106 /// number of time slots to future and clock used as measuring time source for this group.
107 /// For usage with `std` library feature, you can use [`StdTimerClock`](struct.StdTimerClock.html).
108 pub fn with(clock: C, prop: u16, future: F) -> Self {
109 let mut group = TimingGroup::new();
110 let key = group.insert(prop);
111 Self {
112 record: Registered {
113 index: key,
114 group: Rc::new((Ucw::new(group), clock)),
115 },
116 future,
117 }
118 }
119 /// Insert future to this load balancing group with given number of time slots assigned. Returned wrapper
120 /// also belongs to the same group.
121 pub fn insert<G>(&self, prop: u16, future: G) -> LoadBalance<G, C> where G: Future {
122 let index = self.record.group.0.borrow_mut().insert(prop);
123 LoadBalance {
124 record: Registered {
125 index,
126 group: self.record.group.clone(),//clone rc
127 },
128 future,
129 }
130 }
131 /// Returns number of tasks registered in this group.
132 pub fn count(&self) -> usize { self.record.group.0.borrow().count() }
133
134 /// Remove this wrapper from load balancing group, if this task is last then `Some` is returned
135 /// with ownership of clock. Otherwise returns `None`.
136 pub fn remove(self) -> Option<C> {
137 let group = self.record.unregister_and_get();
138 Rc::try_unwrap(group).ok().map(|p| p.1)
139 }
140}
141
142impl<F: Future, C: TimerClock> Future for LoadBalance<F, C> {
143 type Output = F::Output;
144 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
145 if !self.record.group.0.borrow().can_execute(self.record.index) {
146 cx.waker().wake_by_ref();
147 return Poll::Pending;
148 }
149
150 let start = self.record.group.1.start(); //start measure
151
152 //SAFETY: we just map pinned mutable references from inside of a struct
153 let future = unsafe{ self.as_mut().map_unchecked_mut(|v|&mut v.future) };
154
155 let res = future.poll(cx);
156 let dur = self.record.group.1.stop(start); //end measure
157 self.record.group.0.borrow_mut().update_duration(self.record.index, dur);
158 return res;
159 }
160}
161
162impl<F: Future + Debug, C: TimerClock + Debug> Debug for LoadBalance<F, C> {
163 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
164 f.debug_struct("LoadBalance").field("future", &self.future) //pinned future by shared ref
165 .field("slots", &self.record.group.0.borrow().get_slot_count(self.record.index).unwrap())
166 .field("clock", &self.record.group.1).finish()
167 }
168}
169