1use crate::{
25 error::{FrozenError, FrozenResult},
26 hints,
27};
28use std::{future, pin, ptr, sync, sync::atomic, task};
29
30pub type TEpoch = u64;
32
33#[derive(Debug)]
34struct AckError(atomic::AtomicPtr<FrozenError>);
35
36impl Default for AckError {
37 fn default() -> Self {
38 Self(atomic::AtomicPtr::new(ptr::null_mut()))
39 }
40}
41
42impl Drop for AckError {
43 fn drop(&mut self) {
44 let err_ptr = self.0.load(atomic::Ordering::Acquire);
45 if !err_ptr.is_null() {
46 let _ = unsafe { Box::from_raw(err_ptr) };
47 }
48 }
49}
50
51#[derive(Debug)]
71pub struct Completion {
72 current_epoch: atomic::AtomicU64,
73 durable_epoch: atomic::AtomicU64,
74 error: AckError,
75 event: event_listener::Event,
76}
77
78impl Default for Completion {
79 fn default() -> Self {
80 Self {
81 current_epoch: atomic::AtomicU64::new(0),
82 durable_epoch: atomic::AtomicU64::new(0),
83 error: AckError::default(),
84 event: event_listener::Event::new(),
85 }
86 }
87}
88
89impl Completion {
90 #[inline]
107 pub fn increment_current_epoch(&self) -> TEpoch {
108 self.current_epoch.fetch_add(1, atomic::Ordering::AcqRel).wrapping_add(1)
109 }
110
111 #[inline]
127 pub fn mark_epoch_as_durable(&self, epoch: TEpoch) {
128 self.durable_epoch.store(epoch, atomic::Ordering::Release);
129 }
130
131 #[inline]
142 pub fn get_err(&self) -> Option<FrozenError> {
143 let curr_err = self.error.0.load(atomic::Ordering::Acquire);
144 if hints::unlikely(!curr_err.is_null()) {
145 let frozen_error = unsafe { (*curr_err).clone() };
146 return Some(frozen_error);
147 }
148
149 None
150 }
151
152 #[inline]
166 pub fn set_err(&self, new_error: FrozenError) {
167 let boxed_error = Box::into_raw(Box::new(new_error));
168 let old_err = self.error.0.swap(boxed_error, atomic::Ordering::AcqRel);
169
170 if hints::unlikely(!old_err.is_null()) {
171 let _ = unsafe { Box::from_raw(old_err) };
172 }
173 }
174
175 #[inline]
192 pub fn del_err(&self) {
193 let old_err = self.error.0.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
194 if hints::unlikely(!old_err.is_null()) {
195 let _ = unsafe { Box::from_raw(old_err) };
196 }
197 }
198
199 #[inline]
212 pub fn read_current_epoch(&self) -> TEpoch {
213 self.current_epoch.load(atomic::Ordering::Acquire)
214 }
215
216 #[inline]
229 pub fn read_durable_epoch(&self) -> TEpoch {
230 self.durable_epoch.load(atomic::Ordering::Acquire)
231 }
232
233 #[inline]
247 pub fn notify_all_listeners(&self) {
248 self.event.notify(usize::MAX);
249 }
250}
251
252#[derive(Debug)]
268pub struct AckTicket {
269 epoch: TEpoch,
270 completion: sync::Arc<Completion>,
271 listener: Option<event_listener::EventListener>,
272}
273
274impl AckTicket {
275 #[inline]
289 pub const fn new(epoch: TEpoch, completion: sync::Arc<Completion>) -> Self {
290 Self { epoch, completion, listener: None }
291 }
292
293 #[inline(always)]
307 pub const fn epoch(&self) -> TEpoch {
308 self.epoch
309 }
310
311 #[inline]
312 fn is_ready(&self) -> bool {
313 self.completion.read_durable_epoch() >= self.epoch
314 }
315}
316
317impl future::Future for AckTicket {
318 type Output = FrozenResult<TEpoch>;
319
320 fn poll(mut self: pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
321 loop {
322 if self.is_ready() {
323 return task::Poll::Ready(Ok(self.epoch));
324 }
325
326 if let Some(frozen_err) = self.completion.get_err() {
327 return task::Poll::Ready(Err(frozen_err));
328 }
329
330 if self.listener.is_none() {
331 self.listener = Some(self.completion.event.listen());
332
333 continue;
335 }
336
337 let listener = self.listener.as_mut().unwrap();
338 match pin::Pin::new(listener).poll(cx) {
339 task::Poll::Ready(()) => {
340 self.listener = None;
341
342 continue;
344 }
345
346 task::Poll::Pending => {
347 return task::Poll::Pending;
348 }
349 }
350 }
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use crate::error::ErrCode;
358 use std::{sync, thread, time};
359
360 mod completion {
361 use super::*;
362
363 #[test]
364 fn ok_increment_current_epoch() {
365 let completion = Completion::default();
366
367 assert_eq!(completion.increment_current_epoch(), 1);
368 assert_eq!(completion.increment_current_epoch(), 2);
369 assert_eq!(completion.increment_current_epoch(), 3);
370 }
371
372 #[test]
373 fn ok_mark_epoch_as_durable() {
374 let completion = Completion::default();
375 completion.mark_epoch_as_durable(0x0C);
376
377 assert_eq!(completion.read_durable_epoch(), 0x0C);
378 }
379
380 #[test]
381 fn ok_set_get_err() {
382 let completion = Completion::default();
383 let err = FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "failure");
384 completion.set_err(err.clone());
385
386 assert_eq!(completion.get_err(), Some(err));
387 }
388
389 #[test]
390 fn ok_del_err() {
391 let completion = Completion::default();
392
393 completion.set_err(FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "failure"));
394 assert!(completion.get_err().is_some());
395
396 completion.del_err();
397 assert!(completion.get_err().is_none());
398 }
399
400 #[test]
401 fn ok_set_err_overwrites_previous() {
402 let completion = Completion::default();
403
404 let err_1 = FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "first");
405 let err_2 = FrozenError::new(0x11, 0x21, ErrCode::new(0x31, "sync"), "second");
406
407 completion.set_err(err_1);
408 completion.set_err(err_2.clone());
409
410 assert_eq!(completion.get_err(), Some(err_2));
411 }
412 }
413
414 mod ack_ticket {
415 use super::*;
416
417 #[test]
418 fn ok_new() {
419 let completion = sync::Arc::new(Completion::default());
420 let ticket = AckTicket::new(0x23, completion);
421
422 assert_eq!(ticket.epoch(), 0x23);
423 }
424
425 #[test]
426 fn ok_await_when_epoch_already_durable() {
427 let completion = sync::Arc::new(Completion::default());
428 completion.mark_epoch_as_durable(0x0A);
429
430 let ticket = AckTicket::new(0x0A, completion);
431 let durable_epoch = futures::executor::block_on(ticket).expect("ticket must complete");
432
433 assert_eq!(durable_epoch, 0x0A);
434 }
435
436 #[test]
437 fn ok_await_after_durability_progress() {
438 let completion = sync::Arc::new(Completion::default());
439 let ticket = AckTicket::new(1, completion.clone());
440
441 thread::spawn({
442 let completion = completion.clone();
443
444 move || {
445 thread::sleep(time::Duration::from_millis(0x0A));
446
447 completion.mark_epoch_as_durable(1);
448 completion.notify_all_listeners();
449 }
450 });
451
452 let durable_epoch = futures::executor::block_on(ticket).expect("ticket must complete");
453
454 assert_eq!(durable_epoch, 1);
455 }
456
457 #[test]
458 fn err_await_when_error_is_present() {
459 let completion = sync::Arc::new(Completion::default());
460 let expected_error = FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "failure");
461
462 completion.set_err(expected_error.clone());
463
464 let ticket = AckTicket::new(1, completion);
465 let err = futures::executor::block_on(ticket).expect_err("ticket must fail");
466
467 assert_eq!(err, expected_error);
468 }
469
470 #[test]
471 fn err_await_when_error_arrives_later() {
472 let completion = sync::Arc::new(Completion::default());
473 let ticket = AckTicket::new(1, completion.clone());
474 let expected_error = FrozenError::new(0x10, 0x20, ErrCode::new(0x30, "io"), "failure");
475
476 thread::spawn({
477 let completion = completion.clone();
478 let expected_error = expected_error.clone();
479
480 move || {
481 thread::sleep(time::Duration::from_millis(0x0A));
482
483 completion.set_err(expected_error);
484 completion.notify_all_listeners();
485 }
486 });
487
488 let err = futures::executor::block_on(ticket).expect_err("ticket must fail");
489 assert_eq!(err, expected_error);
490 }
491
492 #[test]
493 fn ok_multiple_tickets_waiting_for_same_epoch() {
494 let completion = sync::Arc::new(Completion::default());
495
496 let ticket_1 = AckTicket::new(1, completion.clone());
497 let ticket_2 = AckTicket::new(1, completion.clone());
498 let ticket_3 = AckTicket::new(1, completion.clone());
499
500 thread::spawn({
501 let completion = completion.clone();
502 move || {
503 thread::sleep(time::Duration::from_millis(0x0A));
504
505 completion.mark_epoch_as_durable(1);
506 completion.notify_all_listeners();
507 }
508 });
509
510 assert_eq!(futures::executor::block_on(ticket_1).expect("ticket_1 must complete"), 1);
511 assert_eq!(futures::executor::block_on(ticket_2).expect("ticket_2 must complete"), 1);
512 assert_eq!(futures::executor::block_on(ticket_3).expect("ticket_3 must complete"), 1);
513 }
514
515 #[test]
516 fn ok_multiple_epochs_complete_in_order() {
517 let completion = sync::Arc::new(Completion::default());
518
519 let ticket_1 = AckTicket::new(1, completion.clone());
520 let ticket_2 = AckTicket::new(2, completion.clone());
521 let ticket_3 = AckTicket::new(3, completion.clone());
522
523 completion.mark_epoch_as_durable(3);
524
525 assert_eq!(futures::executor::block_on(ticket_1).expect("ticket_1 must complete"), 1);
526 assert_eq!(futures::executor::block_on(ticket_2).expect("ticket_2 must complete"), 2);
527 assert_eq!(futures::executor::block_on(ticket_3).expect("ticket_3 must complete"), 3);
528 }
529 }
530}