Skip to main content

frozen_core/
ack.rs

1//! Acknowledge primitives for durability tracking
2//!
3//! In our storage system, an acknowledgement represents the eventual durability state of
4//! previously submitted write operation.
5//!
6//! ## Example
7//!
8//! ```
9//! use frozen_core::ack::{AckTicket, Completion};
10//! use std::sync::Arc;
11//!
12//! let completion = Arc::new(Completion::default());
13//!
14//! let epoch = completion.increment_current_epoch();
15//! let ticket = AckTicket::new(epoch, completion.clone());
16//!
17//! completion.mark_epoch_as_durable(epoch);
18//! completion.notify_all_listeners();
19//!
20//! let durable_epoch = futures::executor::block_on(ticket).unwrap();
21//! assert_eq!(durable_epoch, epoch);
22//! ```
23
24use crate::{
25    error::{FrozenError, FrozenResult},
26    hints,
27};
28use std::{future, pin, ptr, sync, sync::atomic, task};
29
30/// A monotomically increasing epoch used as indentifier for tracking durability of write operations
31pub type TEpoch = u64;
32
33#[derive(Debug)]
34struct AckError(atomic::AtomicPtr<FrozenError>);
35
36impl Default for AckError {
37    fn default() -> Self {
38        Self(atomic::AtomicPtr::new(ptr::null_mut()))
39    }
40}
41
42impl Drop for AckError {
43    fn drop(&mut self) {
44        let err_ptr = self.0.load(atomic::Ordering::Acquire);
45        if !err_ptr.is_null() {
46            let _ = unsafe { Box::from_raw(err_ptr) };
47        }
48    }
49}
50
51/// A shared durability acknowledgement state used for issuing [`AckTicket`]'s
52///
53/// The completion state tracks following things:
54///
55/// * The latest assigned epoch
56/// * The latest durable epoch
57/// * Waiters blocked on durability advancement
58/// * Durability errors (if any) blocking durability progress
59///
60/// ## Example
61///
62/// ```
63/// use frozen_core::ack::Completion;
64///
65/// let completion = Completion::default();
66///
67/// assert_eq!(completion.read_current_epoch(), 0);
68/// assert_eq!(completion.read_durable_epoch(), 0);
69/// ```
70#[derive(Debug)]
71pub struct Completion {
72    current_epoch: atomic::AtomicU64,
73    durable_epoch: atomic::AtomicU64,
74    error: AckError,
75    event: event_listener::Event,
76}
77
78impl Default for Completion {
79    fn default() -> Self {
80        Self {
81            current_epoch: atomic::AtomicU64::new(0),
82            durable_epoch: atomic::AtomicU64::new(0),
83            error: AckError::default(),
84            event: event_listener::Event::new(),
85        }
86    }
87}
88
89impl Completion {
90    /// Advance current and return next durability epoch
91    ///
92    /// *NOTE:* Epoch value is monotonically increasing and used to identify unique write
93    /// operations.
94    ///
95    /// ## Example
96    ///
97    /// ```
98    /// use frozen_core::ack::Completion;
99    ///
100    /// let completion = Completion::default();
101    ///
102    /// assert_eq!(completion.increment_current_epoch(), 1);
103    /// assert_eq!(completion.increment_current_epoch(), 2);
104    /// assert_eq!(completion.increment_current_epoch(), 3);
105    /// ```
106    #[inline]
107    pub fn increment_current_epoch(&self) -> TEpoch {
108        self.current_epoch.fetch_add(1, atomic::Ordering::AcqRel).wrapping_add(1)
109    }
110
111    /// Mark given [`TEpoch`] as durable
112    ///
113    /// *NOTE:* Once an epoch is marked durable, all earlier epochs are implicitly understood to be
114    /// durable.
115    ///
116    /// ## Example
117    ///
118    /// ```
119    /// use frozen_core::ack::Completion;
120    ///
121    /// let completion = Completion::default();
122    /// completion.mark_epoch_as_durable(0x0A);
123    ///
124    /// assert_eq!(completion.read_durable_epoch(), 0x0A);
125    /// ```
126    #[inline]
127    pub fn mark_epoch_as_durable(&self, epoch: TEpoch) {
128        self.durable_epoch.store(epoch, atomic::Ordering::Release);
129    }
130
131    /// Fetch the acknowledgement error (if any)
132    ///
133    /// ## Example
134    ///
135    /// ```
136    /// use frozen_core::ack::Completion;
137    ///
138    /// let completion = Completion::default();
139    /// assert_eq!(completion.get_err(), None);
140    /// ```
141    #[inline]
142    pub fn get_err(&self) -> Option<FrozenError> {
143        let curr_err = self.error.0.load(atomic::Ordering::Acquire);
144        if hints::unlikely(!curr_err.is_null()) {
145            let frozen_error = unsafe { (*curr_err).clone() };
146            return Some(frozen_error);
147        }
148
149        None
150    }
151
152    /// Update current acknowledgement error w/ a new [`FrozenError`]
153    ///
154    /// ## Example
155    ///
156    /// ```
157    /// use frozen_core::{ack::Completion, error::{FrozenError, ErrCode}};
158    ///
159    /// let completion = Completion::default();
160    /// let new_error = FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "failed to read file");
161    ///
162    /// completion.set_err(new_error.clone());
163    /// assert_eq!(completion.get_err(), Some(new_error));
164    /// ```
165    #[inline]
166    pub fn set_err(&self, new_error: FrozenError) {
167        let boxed_error = Box::into_raw(Box::new(new_error));
168        let old_err = self.error.0.swap(boxed_error, atomic::Ordering::AcqRel);
169
170        if hints::unlikely(!old_err.is_null()) {
171            let _ = unsafe { Box::from_raw(old_err) };
172        }
173    }
174
175    /// Clear acknowledgement error by replacing the underying error w/ an empty pointer
176    ///
177    /// ## Example
178    ///
179    /// ```
180    /// use frozen_core::{ack::Completion, error::{FrozenError, ErrCode}};
181    ///
182    /// let completion = Completion::default();
183    /// let new_error = FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "failed to read file");
184    ///
185    /// completion.set_err(new_error.clone());
186    /// assert!(completion.get_err().is_some());
187    ///
188    /// completion.del_err();
189    /// assert!(completion.get_err().is_none());
190    /// ```
191    #[inline]
192    pub fn del_err(&self) {
193        let old_err = self.error.0.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
194        if hints::unlikely(!old_err.is_null()) {
195            let _ = unsafe { Box::from_raw(old_err) };
196        }
197    }
198
199    /// Read the latest assigned epoch
200    ///
201    /// ## Example
202    ///
203    /// ```
204    /// use frozen_core::ack::Completion;
205    ///
206    /// let completion = Completion::default();
207    /// completion.increment_current_epoch();
208    ///
209    /// assert_eq!(completion.read_current_epoch(), 1);
210    /// ```
211    #[inline]
212    pub fn read_current_epoch(&self) -> TEpoch {
213        self.current_epoch.load(atomic::Ordering::Acquire)
214    }
215
216    /// Read the latest durable epoch
217    ///
218    /// ## Example
219    ///
220    /// ```
221    /// use frozen_core::ack::Completion;
222    ///
223    /// let completion = Completion::default();
224    /// completion.mark_epoch_as_durable(0x3A);
225    ///
226    /// assert_eq!(completion.read_durable_epoch(), 0x3A);
227    /// ```
228    #[inline]
229    pub fn read_durable_epoch(&self) -> TEpoch {
230        self.durable_epoch.load(atomic::Ordering::Acquire)
231    }
232
233    /// Wake all the listeners currently waiting for durability progress
234    ///
235    /// *NOTE:* Waking listeners does not modify any durable state and are typically called after
236    /// advancing the durable epoch or after occurence of [`AckError`].
237    ///
238    /// ## Example
239    ///
240    /// ```
241    /// use frozen_core::ack::Completion;
242    ///
243    /// let completion = Completion::default();
244    /// completion.notify_all_listeners();
245    /// ```
246    #[inline]
247    pub fn notify_all_listeners(&self) {
248        self.event.notify(usize::MAX);
249    }
250}
251
252/// Durability handle associated with the write operation
253///
254/// ## Epoch
255///
256/// Every ticket is assigned a monotonically increasing epoch to moniter durability
257///
258/// ## Durability Guarantee
259///
260/// If wanted, the ticket could be awaited to poll till the epoch becomes durable.
261///
262/// Once a await on ticket is completed successfully, all writes assigned to earlier epochs are
263/// also guaranteed to be durable.
264///
265/// *NOTE:* Using `await` is optional. Callers that only require fire-and-forget semantics may
266/// simply discard the returned ticket.
267#[derive(Debug)]
268pub struct AckTicket {
269    epoch: TEpoch,
270    completion: sync::Arc<Completion>,
271    listener: Option<event_listener::EventListener>,
272}
273
274impl AckTicket {
275    /// Construct a new [`AckTicket`] for a write operation
276    ///
277    /// ## Example
278    ///
279    /// ```
280    /// use frozen_core::ack::{AckTicket, Completion};
281    /// use std::sync::Arc;
282    ///
283    /// let completion = Arc::new(Completion::default());
284    /// let ticket = AckTicket::new(1, completion);
285    ///
286    /// assert_eq!(ticket.epoch(), 1);
287    /// ```
288    #[inline]
289    pub const fn new(epoch: TEpoch, completion: sync::Arc<Completion>) -> Self {
290        Self { epoch, completion, listener: None }
291    }
292
293    /// Read assigned durability epoch for the [`AckTicket`]
294    ///
295    /// ## Example
296    ///
297    /// ```
298    /// use frozen_core::ack::{AckTicket, Completion};
299    /// use std::sync::Arc;
300    ///
301    /// let completion = Arc::new(Completion::default());
302    /// let ticket = AckTicket::new(0x4C, completion);
303    ///
304    /// assert_eq!(ticket.epoch(), 0x4C);
305    /// ```
306    #[inline(always)]
307    pub const fn epoch(&self) -> TEpoch {
308        self.epoch
309    }
310
311    #[inline]
312    fn is_ready(&self) -> bool {
313        self.completion.read_durable_epoch() >= self.epoch
314    }
315}
316
317impl future::Future for AckTicket {
318    type Output = FrozenResult<TEpoch>;
319
320    fn poll(mut self: pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
321        loop {
322            if self.is_ready() {
323                return task::Poll::Ready(Ok(self.epoch));
324            }
325
326            if let Some(frozen_err) = self.completion.get_err() {
327                return task::Poll::Ready(Err(frozen_err));
328            }
329
330            if self.listener.is_none() {
331                self.listener = Some(self.completion.event.listen());
332
333                // NOTE: prevent lost wakeups
334                continue;
335            }
336
337            let listener = self.listener.as_mut().unwrap();
338            match pin::Pin::new(listener).poll(cx) {
339                task::Poll::Ready(()) => {
340                    self.listener = None;
341
342                    // NOTE: Re-check durable epoch & error
343                    continue;
344                }
345
346                task::Poll::Pending => {
347                    return task::Poll::Pending;
348                }
349            }
350        }
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use crate::error::ErrCode;
358    use std::{sync, thread, time};
359
360    mod completion {
361        use super::*;
362
363        #[test]
364        fn ok_increment_current_epoch() {
365            let completion = Completion::default();
366
367            assert_eq!(completion.increment_current_epoch(), 1);
368            assert_eq!(completion.increment_current_epoch(), 2);
369            assert_eq!(completion.increment_current_epoch(), 3);
370        }
371
372        #[test]
373        fn ok_mark_epoch_as_durable() {
374            let completion = Completion::default();
375            completion.mark_epoch_as_durable(0x0C);
376
377            assert_eq!(completion.read_durable_epoch(), 0x0C);
378        }
379
380        #[test]
381        fn ok_set_get_err() {
382            let completion = Completion::default();
383            let err = FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "failure");
384            completion.set_err(err.clone());
385
386            assert_eq!(completion.get_err(), Some(err));
387        }
388
389        #[test]
390        fn ok_del_err() {
391            let completion = Completion::default();
392
393            completion.set_err(FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "failure"));
394            assert!(completion.get_err().is_some());
395
396            completion.del_err();
397            assert!(completion.get_err().is_none());
398        }
399
400        #[test]
401        fn ok_set_err_overwrites_previous() {
402            let completion = Completion::default();
403
404            let err_1 = FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "first");
405            let err_2 = FrozenError::new(0x11, 0x21, ErrCode::new(0x31, "sync"), "second");
406
407            completion.set_err(err_1);
408            completion.set_err(err_2.clone());
409
410            assert_eq!(completion.get_err(), Some(err_2));
411        }
412    }
413
414    mod ack_ticket {
415        use super::*;
416
417        #[test]
418        fn ok_new() {
419            let completion = sync::Arc::new(Completion::default());
420            let ticket = AckTicket::new(0x23, completion);
421
422            assert_eq!(ticket.epoch(), 0x23);
423        }
424
425        #[test]
426        fn ok_await_when_epoch_already_durable() {
427            let completion = sync::Arc::new(Completion::default());
428            completion.mark_epoch_as_durable(0x0A);
429
430            let ticket = AckTicket::new(0x0A, completion);
431            let durable_epoch = futures::executor::block_on(ticket).expect("ticket must complete");
432
433            assert_eq!(durable_epoch, 0x0A);
434        }
435
436        #[test]
437        fn ok_await_after_durability_progress() {
438            let completion = sync::Arc::new(Completion::default());
439            let ticket = AckTicket::new(1, completion.clone());
440
441            thread::spawn({
442                let completion = completion.clone();
443
444                move || {
445                    thread::sleep(time::Duration::from_millis(0x0A));
446
447                    completion.mark_epoch_as_durable(1);
448                    completion.notify_all_listeners();
449                }
450            });
451
452            let durable_epoch = futures::executor::block_on(ticket).expect("ticket must complete");
453
454            assert_eq!(durable_epoch, 1);
455        }
456
457        #[test]
458        fn err_await_when_error_is_present() {
459            let completion = sync::Arc::new(Completion::default());
460            let expected_error = FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "failure");
461
462            completion.set_err(expected_error.clone());
463
464            let ticket = AckTicket::new(1, completion);
465            let err = futures::executor::block_on(ticket).expect_err("ticket must fail");
466
467            assert_eq!(err, expected_error);
468        }
469
470        #[test]
471        fn err_await_when_error_arrives_later() {
472            let completion = sync::Arc::new(Completion::default());
473            let ticket = AckTicket::new(1, completion.clone());
474            let expected_error = FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "failure");
475
476            thread::spawn({
477                let completion = completion.clone();
478                let expected_error = expected_error.clone();
479
480                move || {
481                    thread::sleep(time::Duration::from_millis(0x0A));
482
483                    completion.set_err(expected_error);
484                    completion.notify_all_listeners();
485                }
486            });
487
488            let err = futures::executor::block_on(ticket).expect_err("ticket must fail");
489            assert_eq!(err, expected_error);
490        }
491
492        #[test]
493        fn ok_multiple_tickets_waiting_for_same_epoch() {
494            let completion = sync::Arc::new(Completion::default());
495
496            let ticket_1 = AckTicket::new(1, completion.clone());
497            let ticket_2 = AckTicket::new(1, completion.clone());
498            let ticket_3 = AckTicket::new(1, completion.clone());
499
500            thread::spawn({
501                let completion = completion.clone();
502                move || {
503                    thread::sleep(time::Duration::from_millis(0x0A));
504
505                    completion.mark_epoch_as_durable(1);
506                    completion.notify_all_listeners();
507                }
508            });
509
510            assert_eq!(futures::executor::block_on(ticket_1).expect("ticket_1 must complete"), 1);
511            assert_eq!(futures::executor::block_on(ticket_2).expect("ticket_2 must complete"), 1);
512            assert_eq!(futures::executor::block_on(ticket_3).expect("ticket_3 must complete"), 1);
513        }
514
515        #[test]
516        fn ok_multiple_epochs_complete_in_order() {
517            let completion = sync::Arc::new(Completion::default());
518
519            let ticket_1 = AckTicket::new(1, completion.clone());
520            let ticket_2 = AckTicket::new(2, completion.clone());
521            let ticket_3 = AckTicket::new(3, completion.clone());
522
523            completion.mark_epoch_as_durable(3);
524
525            assert_eq!(futures::executor::block_on(ticket_1).expect("ticket_1 must complete"), 1);
526            assert_eq!(futures::executor::block_on(ticket_2).expect("ticket_2 must complete"), 2);
527            assert_eq!(futures::executor::block_on(ticket_3).expect("ticket_3 must complete"), 3);
528        }
529    }
530}