Skip to main content

okee_wheel_timer/keyed_time_wheel/
core.rs

1use std::collections::HashMap;
2use std::hash::Hash;
3
4use crate::event::{Event, EventId};
5use crate::time_wheel::{HashedWheelTimer, TimeWheelError};
6
7use super::types::{KeyedScheduleResult, KeyedUpdateResult};
8
9#[derive(Debug)]
10/// Keyed wrapper over [`HashedWheelTimer`].
11///
12/// A dedup key is produced by `dedup_key_fn`.
13/// Scheduling/updating an event with an existing key replaces the previous one.
14pub struct KeyedHashedWheelTimer<T, K, F>
15where
16    K: Eq + Hash + Clone,
17    F: Fn(&T) -> K,
18{
19    dedup_key_fn: F,
20    by_key: HashMap<K, EventId>,
21    wheel: HashedWheelTimer<T>,
22}
23
24impl<T, K, F> KeyedHashedWheelTimer<T, K, F>
25where
26    K: Eq + Hash + Clone,
27    F: Fn(&T) -> K,
28{
29    /// Creates a keyed timer.
30    pub fn new(buckets_num: usize, dedup_key_fn: F) -> Self {
31        Self {
32            wheel: HashedWheelTimer::new(buckets_num),
33            dedup_key_fn,
34            by_key: HashMap::new(),
35        }
36    }
37
38    /// Returns total number of scheduled events.
39    pub fn count_all(&self) -> usize {
40        self.wheel.count_all()
41    }
42
43    /// Returns number of events in bucket by index.
44    pub fn count_in_bucket(&self, bucket_index: usize) -> Result<usize, TimeWheelError> {
45        self.wheel.count_in_bucket(bucket_index)
46    }
47
48    /// Returns `true` when no events are scheduled.
49    pub fn is_empty(&self) -> bool {
50        self.wheel.is_empty()
51    }
52
53    /// Returns `true` when bucket contains no events.
54    pub fn is_empty_bucket(&self, bucket_index: usize) -> Result<bool, TimeWheelError> {
55        self.wheel.is_empty_bucket(bucket_index)
56    }
57
58    /// Returns `true` if current tick still has events to pop.
59    pub fn has_events_in_current_tick(&self) -> bool {
60        self.wheel.has_events_in_current_tick()
61    }
62
63    /// Returns current absolute tick.
64    pub fn curr_tick(&self) -> u64 {
65        self.wheel.curr_tick()
66    }
67
68    /// Returns current bucket index.
69    pub fn curr_bucket(&self) -> usize {
70        self.wheel.curr_bucket()
71    }
72
73    /// Returns current wave (`delta_tick`) in the current tick.
74    pub fn curr_delta_tick(&self) -> u64 {
75        self.wheel.curr_delta_tick()
76    }
77
78    /// Returns latest assigned sequence ID.
79    pub fn curr_seq_id(&self) -> EventId {
80        self.wheel.curr_seq_id()
81    }
82
83    /// Advances timer by one tick.
84    pub fn step(&mut self) {
85        self.wheel.step()
86    }
87
88    /// Clears all events and key index, then resets timer state.
89    pub fn reset(&mut self) {
90        self.wheel.reset();
91        self.by_key.clear();
92    }
93
94    /// Returns `true` if event with ID exists.
95    pub fn contains_by_id(&self, id: EventId) -> bool {
96        self.wheel.contains(id)
97    }
98
99    /// Returns `true` if key exists.
100    pub fn contains_by_key(&self, key: &K) -> bool {
101        self.by_key.contains_key(key)
102    }
103
104    /// Returns event ID by key.
105    pub fn id_by_key(&self, key: &K) -> Option<EventId> {
106        self.by_key.get(key).copied()
107    }
108
109    /// Returns event by ID.
110    pub fn get(&self, id: EventId) -> Option<&Event<T>> {
111        self.wheel.get(id)
112    }
113
114    /// Returns event by key.
115    pub fn get_by_key(&self, key: &K) -> Option<&Event<T>> {
116        let id = self.id_by_key(key)?;
117        self.wheel.get(id)
118    }
119
120    /// Schedules an event.
121    ///
122    /// If key already exists, previous event is replaced.
123    pub fn schedule(&mut self, on_tick: u64, data: T) -> KeyedScheduleResult {
124        let key = (self.dedup_key_fn)(&data);
125        let replaced_id = self.by_key.get(&key).copied();
126        if let Some(old_id) = replaced_id {
127            let _ = self.wheel.remove(old_id);
128        }
129
130        let result = self.wheel.schedule(on_tick, data);
131        self.by_key.insert(key, result.id);
132
133        KeyedScheduleResult {
134            id: result.id,
135            replaced_id,
136        }
137    }
138
139    /// Removes event by ID and updates key index.
140    pub fn remove(&mut self, id: EventId) -> Option<Event<T>> {
141        let event = self.wheel.remove(id)?;
142        let key = (self.dedup_key_fn)(event.data());
143        if self.by_key.get(&key).copied() == Some(id) {
144            self.by_key.remove(&key);
145        }
146        Some(event)
147    }
148
149    /// Removes event by dedup key.
150    pub fn remove_by_key(&mut self, key: &K) -> Option<Event<T>> {
151        let id = self.by_key.get(key).copied()?;
152        let event = self.wheel.remove(id)?;
153        self.by_key.remove(key);
154        Some(event)
155    }
156
157    /// Updates event by ID and applies key replacement semantics.
158    pub fn update(&mut self, id: EventId, on_tick: u64, data: T) -> Option<KeyedUpdateResult> {
159        let old_key = {
160            let old_event = self.wheel.get(id)?;
161            (self.dedup_key_fn)(old_event.data())
162        };
163
164        if self.by_key.get(&old_key).copied() == Some(id) {
165            self.by_key.remove(&old_key);
166        }
167
168        let new_key = (self.dedup_key_fn)(&data);
169        let replaced_id = self.by_key.get(&new_key).copied();
170        if let Some(other_id) = replaced_id {
171            let _ = self.wheel.remove(other_id);
172            self.by_key.remove(&new_key);
173        }
174
175        let result = self.wheel.update(id, on_tick, data)?;
176        self.by_key.insert(new_key, result.id);
177
178        Some(KeyedUpdateResult {
179            id: result.id,
180            replaced_id,
181        })
182    }
183
184    /// Updates event by key.
185    pub fn update_by_key(&mut self, key: &K, on_tick: u64, data: T) -> Option<KeyedUpdateResult> {
186        let id = self.id_by_key(key)?;
187        self.update(id, on_tick, data)
188    }
189
190    /// Reschedules event by ID preserving payload and key.
191    pub fn reschedule(&mut self, id: EventId, on_tick: u64) -> Option<KeyedUpdateResult> {
192        let key = {
193            let event = self.wheel.get(id)?;
194            (self.dedup_key_fn)(event.data())
195        };
196
197        let result = self.wheel.reschedule(id, on_tick)?;
198        self.by_key.insert(key, result.id);
199
200        Some(KeyedUpdateResult {
201            id: result.id,
202            replaced_id: None,
203        })
204    }
205
206    /// Reschedules event by key.
207    pub fn reschedule_by_key(&mut self, key: &K, on_tick: u64) -> Option<KeyedUpdateResult> {
208        let id = self.id_by_key(key)?;
209        self.reschedule(id, on_tick)
210    }
211
212    /// Pops one current-tick wave and keeps key index in sync.
213    pub fn pop_events(&mut self) -> Vec<Event<T>> {
214        let events = self.wheel.pop_events();
215        for event in &events {
216            let key = (self.dedup_key_fn)(event.data());
217            if self.by_key.get(&key).copied() == Some(event.id()) {
218                self.by_key.remove(&key);
219            }
220        }
221        events
222    }
223}