bookkeeper_client/client/
metadata.rs

1use std::borrow::Borrow;
2use std::collections::HashMap;
3use std::fmt::{self, Display, Formatter};
4use std::marker::PhantomData;
5use std::sync::Arc;
6use std::time::SystemTime;
7
8use arc_swap::{ArcSwap, Guard};
9use atomic::Atomic;
10use static_assertions::assert_not_impl_any;
11
12use super::errors::{BkError, ErrorKind};
13use crate::meta::{MetaVersion, Versioned};
14
15type Result<T, E = BkError> = std::result::Result<T, E>;
16
17/// Ledger id.
18#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
19pub struct LedgerId(pub(crate) i64);
20
21impl TryFrom<i64> for LedgerId {
22    type Error = BkError;
23
24    fn try_from(i: i64) -> Result<LedgerId> {
25        if i < 0 {
26            return Err(BkError::new(ErrorKind::InvalidLedgerId));
27        }
28        Ok(LedgerId(i))
29    }
30}
31
32impl From<LedgerId> for i64 {
33    fn from(ledger_id: LedgerId) -> Self {
34        ledger_id.0
35    }
36}
37
38impl PartialEq<i64> for LedgerId {
39    fn eq(&self, other: &i64) -> bool {
40        self.0.eq(other)
41    }
42}
43
44impl Display for LedgerId {
45    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), fmt::Error> {
46        Display::fmt(&self.0, f)
47    }
48}
49
50/// Ledger length.
51#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
52pub struct LedgerLength(i64);
53
54impl LedgerLength {
55    pub const ZERO: LedgerLength = LedgerLength(0);
56}
57
58impl From<i64> for LedgerLength {
59    fn from(i: i64) -> LedgerLength {
60        LedgerLength(i)
61    }
62}
63
64impl From<LedgerLength> for i64 {
65    fn from(ledger_length: LedgerLength) -> i64 {
66        ledger_length.0
67    }
68}
69
70impl From<usize> for LedgerLength {
71    fn from(u: usize) -> LedgerLength {
72        LedgerLength(u as i64)
73    }
74}
75
76impl Display for LedgerLength {
77    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), fmt::Error> {
78        Display::fmt(&self.0, f)
79    }
80}
81
82impl std::ops::Sub<i64> for LedgerLength {
83    type Output = Self;
84
85    fn sub(self, rhs: i64) -> LedgerLength {
86        LedgerLength(self.0 - rhs)
87    }
88}
89
90impl std::ops::Add<i64> for LedgerLength {
91    type Output = Self;
92
93    fn add(self, rhs: i64) -> LedgerLength {
94        LedgerLength(self.0 + rhs)
95    }
96}
97
98impl std::ops::SubAssign<i64> for LedgerLength {
99    fn sub_assign(&mut self, rhs: i64) {
100        self.0 -= rhs;
101    }
102}
103
104impl std::ops::AddAssign<i64> for LedgerLength {
105    fn add_assign(&mut self, rhs: i64) {
106        self.0 += rhs;
107    }
108}
109
110impl std::ops::Sub<usize> for LedgerLength {
111    type Output = Self;
112
113    fn sub(self, rhs: usize) -> LedgerLength {
114        LedgerLength(self.0 - rhs as i64)
115    }
116}
117
118impl std::ops::Add<usize> for LedgerLength {
119    type Output = Self;
120
121    fn add(self, rhs: usize) -> LedgerLength {
122        LedgerLength(self.0 + rhs as i64)
123    }
124}
125
126impl std::ops::SubAssign<usize> for LedgerLength {
127    fn sub_assign(&mut self, rhs: usize) {
128        self.0 -= rhs as i64;
129    }
130}
131
132impl std::ops::AddAssign<usize> for LedgerLength {
133    fn add_assign(&mut self, rhs: usize) {
134        self.0 += rhs as i64;
135    }
136}
137
138/// Ledger entry id.
139#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
140#[repr(transparent)]
141#[derive(bytemuck::NoUninit)]
142pub struct EntryId(pub(crate) i64);
143
144impl Display for EntryId {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), fmt::Error> {
146        Display::fmt(&self.0, f)
147    }
148}
149
150impl std::cmp::PartialEq<i64> for EntryId {
151    fn eq(&self, other: &i64) -> bool {
152        self.0.eq(other)
153    }
154}
155
156impl std::cmp::PartialOrd<i64> for EntryId {
157    fn partial_cmp(&self, other: &i64) -> Option<std::cmp::Ordering> {
158        self.0.partial_cmp(other)
159    }
160}
161
162impl std::ops::Sub for EntryId {
163    type Output = i64;
164
165    fn sub(self, rhs: EntryId) -> i64 {
166        self.0 - rhs.0
167    }
168}
169
170impl std::ops::Sub<i64> for EntryId {
171    type Output = Self;
172
173    fn sub(self, rhs: i64) -> EntryId {
174        EntryId(self.0 - rhs)
175    }
176}
177
178impl std::ops::Add<i64> for EntryId {
179    type Output = Self;
180
181    fn add(self, rhs: i64) -> EntryId {
182        EntryId(self.0 + rhs)
183    }
184}
185
186impl std::ops::SubAssign<i64> for EntryId {
187    fn sub_assign(&mut self, rhs: i64) {
188        self.0 -= rhs;
189    }
190}
191
192impl std::ops::AddAssign<i64> for EntryId {
193    fn add_assign(&mut self, rhs: i64) {
194        self.0 += rhs;
195    }
196}
197
198impl EntryId {
199    /// Well-known invalid entry id.
200    pub const INVALID: EntryId = EntryId(-1);
201    /// First valid entry id.
202    pub const MIN: EntryId = EntryId(0);
203
204    /// Returns whether entry id is valid.
205    pub const fn is_valid(&self) -> bool {
206        self.0 >= 0
207    }
208
209    /// Constructs entry id from i64.
210    ///
211    /// # Safety
212    /// Returned entry id is invalid if given i64 is negative.
213    pub const unsafe fn unchecked_from_i64(i: i64) -> EntryId {
214        EntryId(i)
215    }
216}
217
218impl TryFrom<i64> for EntryId {
219    type Error = BkError;
220
221    fn try_from(i: i64) -> Result<EntryId> {
222        if i < 0 {
223            return Err(BkError::new(ErrorKind::InvalidEntryId));
224        }
225        Ok(EntryId(i))
226    }
227}
228
229impl From<EntryId> for i64 {
230    fn from(entry_id: EntryId) -> i64 {
231        entry_id.0
232    }
233}
234
235pub(crate) struct AtomicEntryId {
236    entry_id: atomic::Atomic<EntryId>,
237}
238
239impl AtomicEntryId {
240    pub fn get(&self) -> EntryId {
241        self.entry_id.load(atomic::Ordering::Relaxed)
242    }
243
244    pub fn update(&self, entry_id: EntryId) -> EntryId {
245        let mut current = self.get();
246        if entry_id <= current {
247            return current;
248        }
249        while entry_id > current {
250            match self.entry_id.compare_exchange(
251                current,
252                entry_id,
253                atomic::Ordering::Relaxed,
254                atomic::Ordering::Relaxed,
255            ) {
256                Ok(_) => return entry_id,
257                Err(new_entry_id) => current = new_entry_id,
258            }
259        }
260        current
261    }
262}
263
264impl From<EntryId> for AtomicEntryId {
265    fn from(entry_id: EntryId) -> Self {
266        Self { entry_id: Atomic::new(entry_id) }
267    }
268}
269
270#[derive(Clone, Debug, Eq, PartialEq)]
271pub struct LedgerEnsemble {
272    pub(crate) first_entry_id: EntryId,
273    pub(crate) bookies: Vec<BookieId>,
274}
275
276/// Ledger metadata.
277#[derive(Clone, Debug)]
278pub struct LedgerMetadata {
279    pub ledger_id: LedgerId,
280    pub length: LedgerLength,
281    pub last_entry_id: EntryId,
282    pub state: LedgerState,
283    pub password: Vec<u8>,
284    pub ensemble_size: u32,
285    pub write_quorum_size: u32,
286    pub ack_quorum_size: u32,
287    pub ensembles: Vec<LedgerEnsemble>,
288    pub digest_type: DigestType,
289    pub creation_time: Option<SystemTime>,
290    pub creator_token: i64,
291    pub custom_metadata: HashMap<String, Vec<u8>>,
292    pub format_version: i32,
293}
294
295#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
296pub enum LedgerState {
297    Open,
298    InRecovery,
299    Closed,
300}
301
302/// DigestType specifies digest method in ledger entry writting.
303#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
304#[non_exhaustive]
305pub enum DigestType {
306    CRC32,
307    MAC,
308    CRC32C,
309    DUMMY,
310}
311
312pub struct EnsembleIterator<'a> {
313    next: usize,
314    ensembles: &'a [LedgerEnsemble],
315}
316
317impl<'a> Iterator for EnsembleIterator<'a> {
318    type Item = (EntryId, &'a [BookieId], EntryId);
319
320    fn next(&mut self) -> Option<Self::Item> {
321        if self.next >= self.ensembles.len() {
322            return None;
323        }
324        let ensemble = &self.ensembles[self.next];
325        self.next += 1;
326        let next_ensemble_entry_id =
327            if self.next >= self.ensembles.len() { EntryId::INVALID } else { self.ensembles[self.next].first_entry_id };
328        Some((ensemble.first_entry_id, &ensemble.bookies, next_ensemble_entry_id))
329    }
330}
331
332impl LedgerMetadata {
333    /// Returns ensemble for given entry id and next entry id that will have a different ensemble.
334    pub fn ensemble_at(&self, entry_id: EntryId) -> (EntryId, &[BookieId], EntryId) {
335        assert!(entry_id >= EntryId::MIN);
336        assert!(!self.ensembles.is_empty());
337        assert!(self.ensembles[0].first_entry_id == EntryId::MIN);
338        let i = match self.ensembles.binary_search_by_key(&entry_id, |e| e.first_entry_id) {
339            Ok(i) => i,
340            Err(i) => i - 1,
341        };
342        if i + 1 == self.ensembles.len() {
343            (self.ensembles[i].first_entry_id, &self.ensembles[i].bookies, EntryId::INVALID)
344        } else {
345            (self.ensembles[i].first_entry_id, &self.ensembles[i].bookies, self.ensembles[i + 1].first_entry_id)
346        }
347    }
348
349    pub fn ensemble_iter(&self, entry_id: EntryId) -> EnsembleIterator<'_> {
350        assert!(entry_id >= EntryId::MIN);
351        assert!(!self.ensembles.is_empty());
352        assert!(self.ensembles[0].first_entry_id == EntryId::MIN);
353        let i = match self.ensembles.binary_search_by_key(&entry_id, |e| e.first_entry_id) {
354            Ok(i) => i,
355            Err(i) => i - 1,
356        };
357        EnsembleIterator { next: i, ensembles: &self.ensembles }
358    }
359
360    pub fn last_ensemble(&self) -> &LedgerEnsemble {
361        &self.ensembles[self.ensembles.len() - 1]
362    }
363
364    pub fn last_add_confirmed(&self) -> EntryId {
365        if self.closed() {
366            self.last_entry_id
367        } else if self.ensembles.is_empty() {
368            EntryId::INVALID
369        } else {
370            self.last_ensemble().first_entry_id - 1
371        }
372    }
373
374    pub fn closed(&self) -> bool {
375        self.state == LedgerState::Closed
376    }
377}
378
379pub(crate) trait HasLedgerMetadata {
380    fn metadata(&self) -> &LedgerMetadata;
381
382    fn ensemble_at(&self, entry_id: EntryId) -> (EntryId, &[BookieId], EntryId) {
383        return self.metadata().ensemble_at(entry_id);
384    }
385
386    fn ensemble_iter(&self, entry_id: EntryId) -> EnsembleIterator<'_> {
387        return self.metadata().ensemble_iter(entry_id);
388    }
389
390    fn last_ensemble(&self) -> &LedgerEnsemble {
391        return self.metadata().last_ensemble();
392    }
393
394    fn closed(&self) -> bool {
395        return self.metadata().closed();
396    }
397}
398
399pub struct BorrowedLedgerMetadata {
400    metadata: Guard<Arc<Versioned<LedgerMetadata>>>,
401    _marker: PhantomData<std::rc::Rc<()>>,
402}
403
404assert_not_impl_any!(BorrowedLedgerMetadata: Send, Sync);
405
406impl std::ops::Deref for BorrowedLedgerMetadata {
407    type Target = Versioned<LedgerMetadata>;
408
409    fn deref(&self) -> &Versioned<LedgerMetadata> {
410        &self.metadata
411    }
412}
413
414impl BorrowedLedgerMetadata {
415    pub fn into_owned(self) -> Arc<Versioned<LedgerMetadata>> {
416        Guard::into_inner(self.metadata)
417    }
418}
419
420/// ## Synchronization semantics:
421/// 1. Exposed lac synchronizes `lac` and `metadata`.
422/// 2. It is ok for thread to read delayed data.
423/// 3. In asynchronous rust, code before `.await` happens before code after `.await`.
424/// 4. All to all, read your write and read your read.
425#[derive(Clone)]
426pub struct UpdatingLedgerMetadata {
427    lac: Arc<AtomicEntryId>,
428    metadata: Arc<ArcSwap<Versioned<LedgerMetadata>>>,
429}
430
431impl UpdatingLedgerMetadata {
432    pub fn new(metadata: Versioned<LedgerMetadata>) -> Self {
433        let lac = metadata.last_add_confirmed();
434        Self { lac: Arc::new(lac.into()), metadata: Arc::new(ArcSwap::from_pointee(metadata)) }
435    }
436
437    pub fn closed_entry_id(&self) -> Option<EntryId> {
438        let metadata = self.metadata.load();
439        if metadata.closed() {
440            Some(metadata.last_entry_id)
441        } else {
442            None
443        }
444    }
445
446    pub fn check_read(&self, entry_id: EntryId) -> Result<Arc<Versioned<LedgerMetadata>>> {
447        let (lac, metadata) = self.lac_for_read();
448        if entry_id > lac {
449            return Err(BkError::new(ErrorKind::ReadExceedLastAddConfirmed));
450        }
451        Ok(metadata.into_owned())
452    }
453
454    pub fn check_unconfirmed_read(&self, entry_id: EntryId) -> Result<Arc<Versioned<LedgerMetadata>>> {
455        let metadata = self.borrow();
456        if metadata.closed() && entry_id > metadata.last_entry_id {
457            return Err(BkError::new(ErrorKind::ReadExceedLastAddConfirmed));
458        }
459        Ok(metadata.into_owned())
460    }
461
462    fn lac_for_read(&self) -> (EntryId, BorrowedLedgerMetadata) {
463        let metadata = self.borrow();
464        if metadata.closed() {
465            (metadata.last_entry_id, metadata)
466        } else {
467            (metadata.last_add_confirmed().max(self.lac.get()), metadata)
468        }
469    }
470
471    pub fn lac(&self) -> EntryId {
472        let metadata = self.borrow();
473        if metadata.closed() {
474            return metadata.last_entry_id;
475        }
476        metadata.last_add_confirmed().max(self.lac.get())
477    }
478
479    pub fn last_confirmed_meta(&self) -> Result<(EntryId, LedgerLength), Arc<Versioned<LedgerMetadata>>> {
480        let metadata = self.borrow();
481        if metadata.closed() {
482            Ok((metadata.last_entry_id, metadata.length))
483        } else {
484            Err(metadata.into_owned())
485        }
486    }
487
488    pub fn update_lac(&self, entry_id: EntryId) -> EntryId {
489        self.lac.update(entry_id)
490    }
491
492    pub fn read(&self) -> Arc<Versioned<LedgerMetadata>> {
493        self.metadata.load_full()
494    }
495
496    pub fn borrow(&self) -> BorrowedLedgerMetadata {
497        BorrowedLedgerMetadata { metadata: self.metadata.load(), _marker: PhantomData }
498    }
499
500    pub fn update(&mut self, metadata: Versioned<LedgerMetadata>) {
501        self.lac.update(metadata.last_add_confirmed());
502        let mut last_metadata = self.metadata.load();
503        if metadata.version <= last_metadata.version {
504            return;
505        }
506        let metadata = Arc::new(metadata);
507        loop {
508            let current_metadata = self.metadata.compare_and_swap(&last_metadata, metadata.clone());
509            if Arc::ptr_eq(&*current_metadata, &*last_metadata) || metadata.version <= current_metadata.version {
510                return;
511            }
512            last_metadata = current_metadata;
513        }
514    }
515}
516
517pub struct LedgerMetadataUpdater {
518    version: MetaVersion,
519    metadata: UpdatingLedgerMetadata,
520}
521
522impl LedgerMetadataUpdater {
523    pub fn new(metadata: Versioned<LedgerMetadata>) -> Self {
524        let version = metadata.version;
525        Self { version, metadata: UpdatingLedgerMetadata::new(metadata) }
526    }
527
528    pub fn subscribe(&self) -> UpdatingLedgerMetadata {
529        self.metadata.clone()
530    }
531
532    pub fn update(&mut self, metadata: Versioned<LedgerMetadata>) {
533        if self.version >= metadata.version {
534            return;
535        }
536        self.version = metadata.version;
537        self.metadata.update(metadata);
538    }
539}
540
541#[derive(Clone, Debug, PartialEq, Eq, Hash)]
542pub struct BookieId(compact_str::CompactString);
543
544impl BookieId {
545    pub fn new(s: &str) -> BookieId {
546        BookieId(s.into())
547    }
548
549    pub fn as_str(&self) -> &str {
550        self.0.as_str()
551    }
552}
553
554impl Display for BookieId {
555    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), fmt::Error> {
556        Display::fmt(&self.0.as_str(), f)
557    }
558}
559
560impl std::ops::Deref for BookieId {
561    type Target = str;
562
563    fn deref(&self) -> &str {
564        self.as_str()
565    }
566}
567
568impl AsRef<str> for BookieId {
569    fn as_ref(&self) -> &str {
570        self.0.as_str()
571    }
572}
573
574impl Borrow<str> for BookieId {
575    fn borrow(&self) -> &str {
576        self.0.as_str()
577    }
578}