async_map/single_writer_versioned/
mod.rs1mod private {
9 use std::cell::Cell;
10 use std::ops::Deref;
11 use std::sync::atomic::{AtomicU32, Ordering};
12 use std::sync::Arc;
13
14 pub trait Data: Send + Sync + std::fmt::Debug + 'static {}
15 impl<T: Send + Sync + std::fmt::Debug + 'static> Data for T {}
16
17 pub struct Version<T>
18 where
19 T: Data,
20 {
21 version: u32,
22 data: T,
23 next: Cell<Option<Arc<Version<T>>>>,
24 latest_version: Arc<AtomicU32>,
25 }
26
27 impl<T: Data> std::fmt::Debug for Version<T> {
28 fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result {
29 f.write_fmt(format_args!("Version[{}]", self.version))
30 }
31 }
32
33 impl<T> Version<T>
34 where
35 T: Data,
36 {
37 pub fn initial(data: T) -> (Arc<Version<T>>, Updater<T>) {
38 let initial_version =
39 Arc::new(Version::new_version(data, 0, Arc::new(AtomicU32::new(0))));
40 let updater = Updater {
41 version: initial_version.clone(),
42 };
43 (initial_version, updater)
44 }
45
46 pub fn as_ref(&self) -> &T {
47 &self.data
48 }
49
50 pub fn latest<'a>(self: &'a Arc<Version<T>>) -> Option<&'a Arc<Version<T>>> {
51 let latest_version = self.latest_version.load(Ordering::Acquire);
52
53 if self.version == latest_version {
54 None
55 } else {
56 Some(self.get_version(latest_version))
60 }
61 }
62
63 fn next<'a>(self: &'a Arc<Version<T>>) -> &'a Arc<Version<T>> {
66 unsafe { &*self.next.as_ptr() }.as_ref().unwrap()
67 }
68
69 fn get_version<'a>(self: &'a Arc<Version<T>>, version: u32) -> &'a Arc<Version<T>> {
73 if self.version == version {
74 self
75 } else {
76 self.next().get_version(version)
77 }
78 }
79
80 fn set_next(&self, data: T) -> Result<(Arc<Version<T>>, Updater<T>), T> {
81 let latest_version = self.latest_version.load(Ordering::Acquire);
82
83 if latest_version != self.version {
85 return Err(data);
87 }
88
89 let new_version = latest_version + 1;
90
91 let next = Arc::new(Version::new_version(
92 data,
93 new_version,
94 self.latest_version.clone(),
95 ));
96
97 self.next.replace(Some(next.clone()));
99
100 self.latest_version.store(new_version, Ordering::Release);
103 let updater = Updater {
104 version: next.clone(),
105 };
106 Ok((next, updater))
107 }
108
109 fn new_version(data: T, version: u32, latest_version: Arc<AtomicU32>) -> Version<T> {
110 let result = Version {
111 version,
112 data,
113 next: Cell::new(None),
114 latest_version,
115 };
116 result
117 }
118 }
119
120 unsafe impl<T> Send for Version<T> where T: Data {}
121 unsafe impl<T> Sync for Version<T> where T: Data {}
122
123 impl<T> Deref for Version<T>
124 where
125 T: Data,
126 {
127 type Target = T;
128 fn deref(&self) -> &T {
129 &self.data
130 }
131 }
132
133 pub struct Updater<T>
134 where
135 T: Data,
136 {
137 version: Arc<Version<T>>,
138 }
139
140 impl<T> Updater<T>
141 where
142 T: Data,
143 {
144 pub fn update(self, new_data: T) -> (Arc<Version<T>>, Updater<T>) {
145 self.version.set_next(new_data).expect("Illegal State") }
147 }
148
149 #[cfg(test)]
150 mod test {
151 #![allow(mutable_transmutes)]
152 use super::Version;
153 #[test]
154 fn it_creates_sensible_initial() {
155 let version = Version::initial("hello").0;
156 assert_eq!("hello", version.data);
157 assert_eq!(0, version.version);
158 }
159
160 #[test]
161 fn it_accepts_a_next_version() {
162 let (first, _) = Version::initial("hello");
163 let (second, _) = first.set_next("goodbye").unwrap();
164
165 assert_eq!("hello", first.data);
166 assert_eq!(0, first.version);
167 assert_eq!(second.version, first.next().as_ref().version);
168
169 assert_eq!("goodbye", second.data);
170 assert_eq!(1, second.version);
171 }
172
173 #[test]
174 fn it_does_not_update_next_version() {
175 let (first, _) = Version::initial("hello");
176 let (_, _) = first.set_next("goodbye").unwrap();
177 let result = first.set_next("au revoir");
178
179 assert_eq!(true, result.is_err());
180 }
181
182 #[tokio::test]
183 async fn it_can_be_used_across_tasks() {
184 let version = Version::initial("hello").0;
185
186 version.set_next("goodbye").unwrap();
187
188 tokio::task::spawn(async move {
189 assert_eq!("goodbye", version.next().data);
190 })
191 .await
192 .unwrap();
193 }
194
195 #[test]
196 fn latest_returns_none_on_latest() {
197 let first = Version::initial("hello").0;
198
199 assert_eq!(true, first.latest().is_none());
200
201 let second = first.set_next("goodbye").unwrap().0;
202 assert_eq!(true, second.latest().is_none());
203 }
204
205 #[test]
206 fn latest_returns_latest() {
207 let first = Version::initial("hello").0;
208
209 let second = first.set_next("goodbye").unwrap().0;
210 assert_eq!("goodbye", first.latest().unwrap().data);
211
212 let third = second.set_next("servus").unwrap().0;
213 assert_eq!("servus", first.latest().unwrap().data);
214 assert_eq!("servus", second.latest().unwrap().data);
215 assert_eq!(true, third.latest().is_none());
216 }
217
218 #[test]
219 fn updater_updates() {
220 let (first, updater) = Version::initial("hello");
221
222 let (second, updater) = updater.update("goodbye");
223 assert_eq!("goodbye", first.latest().unwrap().data);
224
225 let third = updater.update("servus").0;
226 assert_eq!("servus", first.latest().unwrap().data);
227 assert_eq!("servus", second.latest().unwrap().data);
228 assert_eq!(true, third.latest().is_none());
229 }
230 }
231}
232
233use self::private::{Data, Updater, Version};
234use std::cell::RefCell;
235use std::sync::Arc;
236use tokio::sync::mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender};
237
238pub trait DataUpdater<T>: (FnOnce(&T) -> Option<T>) + Send + 'static
239where
240 T: Data,
241{
242}
243
244impl<T, S: (FnOnce(&T) -> Option<T>) + Send + 'static> DataUpdater<T> for S where T: Data {}
245
246enum VersionedUpdaterAction<T>
247where
248 T: Data,
249{
250 Update(Box<dyn DataUpdater<T>>),
251 Quit,
252}
253
254#[derive(Clone, Debug)]
264pub struct Versioned<T>
265where
266 T: Data,
267{
268 current_holder: RefCell<Arc<Version<T>>>,
269 update_sender: UnboundedSender<VersionedUpdaterAction<T>>,
270}
271
272pub struct Quitter<T>
285where
286 T: Data,
287{
288 update_sender: UnboundedSender<VersionedUpdaterAction<T>>,
289}
290
291impl<T> Quitter<T>
292where
293 T: Data,
294{
295 pub fn quit(self) {
296 if let Err(_) = self.update_sender.send(VersionedUpdaterAction::Quit) {
297 }
299 }
300}
301
302impl<T> Versioned<T>
303where
304 T: Data,
305{
306 pub fn from_initial(data: T) -> (Self, Quitter<T>) {
309 let (initial_version, update_sender) = VersionedUpdater::start_from_initial(data);
310
311 (
312 Versioned {
313 current_holder: RefCell::from(initial_version),
314 update_sender: update_sender.clone(),
315 },
316 Quitter { update_sender },
317 )
318 }
319
320 pub fn with_latest<U, F: FnOnce(&T) -> U>(&self, action: F) -> U {
325 self.ensure_latest();
326 let the_ref = self.current_holder.borrow();
327 action(&***the_ref)
328 }
329
330 fn ensure_latest(&self) {
331 let current = self.current_holder.borrow();
332
333 if let Some(new_version) = current.latest() {
334 let new_version = new_version.clone();
335 drop(current); self.current_holder.replace(new_version);
337 }
338 }
339
340 pub fn update(
344 &self,
345 update_fn: Box<dyn DataUpdater<T>>,
346 ) -> Result<(), Box<dyn DataUpdater<T>>> {
347 self.update_sender
348 .send(VersionedUpdaterAction::Update(update_fn))
349 .map_err(|action| match action {
350 mpsc::error::SendError(VersionedUpdaterAction::Update(update_fn)) => update_fn,
351 _ => panic!("Received illegal error"),
352 })
353 }
354}
355
356struct VersionedUpdater<T>
358where
359 T: Data,
360{
361 current: (Arc<Version<T>>, Updater<T>),
362 update_receiver: UnboundedReceiver<VersionedUpdaterAction<T>>,
363}
364
365impl<T> VersionedUpdater<T>
366where
367 T: Data,
368{
369 fn start_from_initial(
370 data: T,
371 ) -> (Arc<Version<T>>, UnboundedSender<VersionedUpdaterAction<T>>) {
372 let (initial_version, updater) = Version::initial(data);
373
374 let (update_sender, update_receiver) = unbounded_channel();
375
376 let current = (initial_version.clone(), updater);
377
378 VersionedUpdater {
379 current,
380 update_receiver,
381 }
382 .run();
383
384 (initial_version, update_sender)
385 }
386
387 fn run(mut self) {
388 tokio::task::spawn(async move {
389 while let Some(action) = self.update_receiver.recv().await {
390 match action {
391 VersionedUpdaterAction::Update(update_fn) => {
392 if let Some(new_data) = update_fn(self.current.0.as_ref().as_ref()) {
393 self.current = self.current.1.update(new_data);
394 }
395 }
396 VersionedUpdaterAction::Quit => {
397 break;
398 }
399 }
400 }
401 });
402 }
403}
404
405#[cfg(test)]
406mod test {
407 use super::*;
408 use std::sync::atomic::{AtomicU32, Ordering};
409
410 #[tokio::test]
411 async fn intial_holds_passed_data() {
412 let versioned = Versioned::from_initial(String::from("Hello")).0;
413
414 versioned.with_latest(|data| assert_eq!("Hello", data));
415 }
416
417 #[tokio::test]
418 async fn updates_are_processed() {
419 let versioned = Versioned::from_initial(String::from("Hello")).0;
420
421 versioned
423 .update(Box::new(|old| Some(old.clone() + ", World")))
424 .map_err(|_| ())
425 .expect("Should be ok");
426
427 tokio::task::yield_now().await;
428
429 versioned.with_latest(|data| assert_eq!("Hello, World", data));
430 }
431
432 #[tokio::test]
433 async fn updates_are_shared() {
434 let versioned = Versioned::from_initial(String::from("Hello")).0;
435 let clone = versioned.clone();
436 versioned
438 .update(Box::new(|old| Some(old.clone() + ", World")))
439 .map_err(|_| ())
440 .expect("Should be ok");
441
442 tokio::task::yield_now().await;
443
444 versioned.with_latest(|data| assert_eq!("Hello, World", data));
445 clone.with_latest(|data| assert_eq!("Hello, World", data));
446 }
447
448 #[tokio::test]
449 async fn quitter_quits() {
450 let tuple = Versioned::from_initial(String::from("Hello"));
451 let versioned = tuple.0;
452 let quitter = tuple.1;
453
454 versioned
456 .update(Box::new(|old| Some(old.clone() + ", World")))
457 .map_err(|_| ())
458 .expect("Should be ok");
459 tokio::task::yield_now().await;
460
461 quitter.quit();
462 tokio::task::yield_now().await;
463
464 let res = versioned.update(Box::new(|old| Some(old.clone() + "! And Moon!")));
465
466 assert_eq!(true, res.is_err());
467
468 tokio::task::yield_now().await;
469 versioned.with_latest(|data| assert_eq!("Hello, World", data));
471 }
472
473 #[derive(Debug)]
474 struct TestData {
475 drop_counter: Arc<AtomicU32>,
476 }
477
478 impl Drop for TestData {
479 fn drop(&mut self) {
480 self.drop_counter.fetch_add(1, Ordering::Release);
481 }
482 }
483
484 #[tokio::test]
485 async fn old_versions_are_purged() {
486 let counter = Arc::<AtomicU32>::default();
487 let drop_counter = counter.clone();
488
489 let versioned: Versioned<Arc<TestData>> = Versioned::from_initial(Arc::new(TestData {
490 drop_counter: drop_counter,
491 }))
492 .0;
493 let clone = versioned.clone();
494
495 assert_eq!(0, counter.load(Ordering::Acquire));
496
497 let drop_counter = counter.clone();
498
499 versioned
501 .update(Box::new(|_| {
502 Some(Arc::new(TestData {
503 drop_counter: drop_counter,
504 }))
505 }))
506 .map_err(|_| ())
507 .expect("Should be ok");
508
509 tokio::task::yield_now().await;
510 versioned.with_latest(Box::new(|_: &Arc<TestData>| ()));
512 tokio::task::yield_now().await;
513
514 assert_eq!(0, counter.load(Ordering::Acquire));
516
517 clone.with_latest(Box::new(|_: &Arc<TestData>| ()));
519 tokio::task::yield_now().await;
520 assert_eq!(1, counter.load(Ordering::Acquire));
522
523 drop(versioned);
524 drop(clone);
525
526 tokio::task::yield_now().await;
527
528 assert_eq!(2, counter.load(Ordering::Acquire));
531 }
532
533 fn any_test<T: std::any::Any + Send + 'static>(func: Box<dyn FnOnce() -> Box<T>>) -> Box<T> {
534 func()
535 }
536
537 #[tokio::test]
538 async fn test_any() {
539 let foo = any_test(Box::new(|| Box::new(String::from("Hello"))));
540
541 assert_eq!("Hello", *foo);
542
543 let bar = any_test(Box::new(|| {
544 Box::new(im::HashMap::new().update("key", "secret"))
545 }));
546
547 assert_eq!("secret", *bar.get("key").unwrap());
548 }
549}