bookkeeper_client/client/
metadata.rs1use std::borrow::Borrow;
2use std::collections::HashMap;
3use std::fmt::{self, Display, Formatter};
4use std::marker::PhantomData;
5use std::sync::Arc;
6use std::time::SystemTime;
7
8use arc_swap::{ArcSwap, Guard};
9use atomic::Atomic;
10use static_assertions::assert_not_impl_any;
11
12use super::errors::{BkError, ErrorKind};
13use crate::meta::{MetaVersion, Versioned};
14
15type Result<T, E = BkError> = std::result::Result<T, E>;
16
17#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
19pub struct LedgerId(pub(crate) i64);
20
21impl TryFrom<i64> for LedgerId {
22 type Error = BkError;
23
24 fn try_from(i: i64) -> Result<LedgerId> {
25 if i < 0 {
26 return Err(BkError::new(ErrorKind::InvalidLedgerId));
27 }
28 Ok(LedgerId(i))
29 }
30}
31
32impl From<LedgerId> for i64 {
33 fn from(ledger_id: LedgerId) -> Self {
34 ledger_id.0
35 }
36}
37
38impl PartialEq<i64> for LedgerId {
39 fn eq(&self, other: &i64) -> bool {
40 self.0.eq(other)
41 }
42}
43
44impl Display for LedgerId {
45 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), fmt::Error> {
46 Display::fmt(&self.0, f)
47 }
48}
49
50#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
52pub struct LedgerLength(i64);
53
54impl LedgerLength {
55 pub const ZERO: LedgerLength = LedgerLength(0);
56}
57
58impl From<i64> for LedgerLength {
59 fn from(i: i64) -> LedgerLength {
60 LedgerLength(i)
61 }
62}
63
64impl From<LedgerLength> for i64 {
65 fn from(ledger_length: LedgerLength) -> i64 {
66 ledger_length.0
67 }
68}
69
70impl From<usize> for LedgerLength {
71 fn from(u: usize) -> LedgerLength {
72 LedgerLength(u as i64)
73 }
74}
75
76impl Display for LedgerLength {
77 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), fmt::Error> {
78 Display::fmt(&self.0, f)
79 }
80}
81
82impl std::ops::Sub<i64> for LedgerLength {
83 type Output = Self;
84
85 fn sub(self, rhs: i64) -> LedgerLength {
86 LedgerLength(self.0 - rhs)
87 }
88}
89
90impl std::ops::Add<i64> for LedgerLength {
91 type Output = Self;
92
93 fn add(self, rhs: i64) -> LedgerLength {
94 LedgerLength(self.0 + rhs)
95 }
96}
97
98impl std::ops::SubAssign<i64> for LedgerLength {
99 fn sub_assign(&mut self, rhs: i64) {
100 self.0 -= rhs;
101 }
102}
103
104impl std::ops::AddAssign<i64> for LedgerLength {
105 fn add_assign(&mut self, rhs: i64) {
106 self.0 += rhs;
107 }
108}
109
110impl std::ops::Sub<usize> for LedgerLength {
111 type Output = Self;
112
113 fn sub(self, rhs: usize) -> LedgerLength {
114 LedgerLength(self.0 - rhs as i64)
115 }
116}
117
118impl std::ops::Add<usize> for LedgerLength {
119 type Output = Self;
120
121 fn add(self, rhs: usize) -> LedgerLength {
122 LedgerLength(self.0 + rhs as i64)
123 }
124}
125
126impl std::ops::SubAssign<usize> for LedgerLength {
127 fn sub_assign(&mut self, rhs: usize) {
128 self.0 -= rhs as i64;
129 }
130}
131
132impl std::ops::AddAssign<usize> for LedgerLength {
133 fn add_assign(&mut self, rhs: usize) {
134 self.0 += rhs as i64;
135 }
136}
137
138#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
140#[repr(transparent)]
141#[derive(bytemuck::NoUninit)]
142pub struct EntryId(pub(crate) i64);
143
144impl Display for EntryId {
145 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), fmt::Error> {
146 Display::fmt(&self.0, f)
147 }
148}
149
150impl std::cmp::PartialEq<i64> for EntryId {
151 fn eq(&self, other: &i64) -> bool {
152 self.0.eq(other)
153 }
154}
155
156impl std::cmp::PartialOrd<i64> for EntryId {
157 fn partial_cmp(&self, other: &i64) -> Option<std::cmp::Ordering> {
158 self.0.partial_cmp(other)
159 }
160}
161
162impl std::ops::Sub for EntryId {
163 type Output = i64;
164
165 fn sub(self, rhs: EntryId) -> i64 {
166 self.0 - rhs.0
167 }
168}
169
170impl std::ops::Sub<i64> for EntryId {
171 type Output = Self;
172
173 fn sub(self, rhs: i64) -> EntryId {
174 EntryId(self.0 - rhs)
175 }
176}
177
178impl std::ops::Add<i64> for EntryId {
179 type Output = Self;
180
181 fn add(self, rhs: i64) -> EntryId {
182 EntryId(self.0 + rhs)
183 }
184}
185
186impl std::ops::SubAssign<i64> for EntryId {
187 fn sub_assign(&mut self, rhs: i64) {
188 self.0 -= rhs;
189 }
190}
191
192impl std::ops::AddAssign<i64> for EntryId {
193 fn add_assign(&mut self, rhs: i64) {
194 self.0 += rhs;
195 }
196}
197
198impl EntryId {
199 pub const INVALID: EntryId = EntryId(-1);
201 pub const MIN: EntryId = EntryId(0);
203
204 pub const fn is_valid(&self) -> bool {
206 self.0 >= 0
207 }
208
209 pub const unsafe fn unchecked_from_i64(i: i64) -> EntryId {
214 EntryId(i)
215 }
216}
217
218impl TryFrom<i64> for EntryId {
219 type Error = BkError;
220
221 fn try_from(i: i64) -> Result<EntryId> {
222 if i < 0 {
223 return Err(BkError::new(ErrorKind::InvalidEntryId));
224 }
225 Ok(EntryId(i))
226 }
227}
228
229impl From<EntryId> for i64 {
230 fn from(entry_id: EntryId) -> i64 {
231 entry_id.0
232 }
233}
234
235pub(crate) struct AtomicEntryId {
236 entry_id: atomic::Atomic<EntryId>,
237}
238
239impl AtomicEntryId {
240 pub fn get(&self) -> EntryId {
241 self.entry_id.load(atomic::Ordering::Relaxed)
242 }
243
244 pub fn update(&self, entry_id: EntryId) -> EntryId {
245 let mut current = self.get();
246 if entry_id <= current {
247 return current;
248 }
249 while entry_id > current {
250 match self.entry_id.compare_exchange(
251 current,
252 entry_id,
253 atomic::Ordering::Relaxed,
254 atomic::Ordering::Relaxed,
255 ) {
256 Ok(_) => return entry_id,
257 Err(new_entry_id) => current = new_entry_id,
258 }
259 }
260 current
261 }
262}
263
264impl From<EntryId> for AtomicEntryId {
265 fn from(entry_id: EntryId) -> Self {
266 Self { entry_id: Atomic::new(entry_id) }
267 }
268}
269
270#[derive(Clone, Debug, Eq, PartialEq)]
271pub struct LedgerEnsemble {
272 pub(crate) first_entry_id: EntryId,
273 pub(crate) bookies: Vec<BookieId>,
274}
275
276#[derive(Clone, Debug)]
278pub struct LedgerMetadata {
279 pub ledger_id: LedgerId,
280 pub length: LedgerLength,
281 pub last_entry_id: EntryId,
282 pub state: LedgerState,
283 pub password: Vec<u8>,
284 pub ensemble_size: u32,
285 pub write_quorum_size: u32,
286 pub ack_quorum_size: u32,
287 pub ensembles: Vec<LedgerEnsemble>,
288 pub digest_type: DigestType,
289 pub creation_time: Option<SystemTime>,
290 pub creator_token: i64,
291 pub custom_metadata: HashMap<String, Vec<u8>>,
292 pub format_version: i32,
293}
294
295#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
296pub enum LedgerState {
297 Open,
298 InRecovery,
299 Closed,
300}
301
302#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
304#[non_exhaustive]
305pub enum DigestType {
306 CRC32,
307 MAC,
308 CRC32C,
309 DUMMY,
310}
311
312pub struct EnsembleIterator<'a> {
313 next: usize,
314 ensembles: &'a [LedgerEnsemble],
315}
316
317impl<'a> Iterator for EnsembleIterator<'a> {
318 type Item = (EntryId, &'a [BookieId], EntryId);
319
320 fn next(&mut self) -> Option<Self::Item> {
321 if self.next >= self.ensembles.len() {
322 return None;
323 }
324 let ensemble = &self.ensembles[self.next];
325 self.next += 1;
326 let next_ensemble_entry_id =
327 if self.next >= self.ensembles.len() { EntryId::INVALID } else { self.ensembles[self.next].first_entry_id };
328 Some((ensemble.first_entry_id, &ensemble.bookies, next_ensemble_entry_id))
329 }
330}
331
332impl LedgerMetadata {
333 pub fn ensemble_at(&self, entry_id: EntryId) -> (EntryId, &[BookieId], EntryId) {
335 assert!(entry_id >= EntryId::MIN);
336 assert!(!self.ensembles.is_empty());
337 assert!(self.ensembles[0].first_entry_id == EntryId::MIN);
338 let i = match self.ensembles.binary_search_by_key(&entry_id, |e| e.first_entry_id) {
339 Ok(i) => i,
340 Err(i) => i - 1,
341 };
342 if i + 1 == self.ensembles.len() {
343 (self.ensembles[i].first_entry_id, &self.ensembles[i].bookies, EntryId::INVALID)
344 } else {
345 (self.ensembles[i].first_entry_id, &self.ensembles[i].bookies, self.ensembles[i + 1].first_entry_id)
346 }
347 }
348
349 pub fn ensemble_iter(&self, entry_id: EntryId) -> EnsembleIterator<'_> {
350 assert!(entry_id >= EntryId::MIN);
351 assert!(!self.ensembles.is_empty());
352 assert!(self.ensembles[0].first_entry_id == EntryId::MIN);
353 let i = match self.ensembles.binary_search_by_key(&entry_id, |e| e.first_entry_id) {
354 Ok(i) => i,
355 Err(i) => i - 1,
356 };
357 EnsembleIterator { next: i, ensembles: &self.ensembles }
358 }
359
360 pub fn last_ensemble(&self) -> &LedgerEnsemble {
361 &self.ensembles[self.ensembles.len() - 1]
362 }
363
364 pub fn last_add_confirmed(&self) -> EntryId {
365 if self.closed() {
366 self.last_entry_id
367 } else if self.ensembles.is_empty() {
368 EntryId::INVALID
369 } else {
370 self.last_ensemble().first_entry_id - 1
371 }
372 }
373
374 pub fn closed(&self) -> bool {
375 self.state == LedgerState::Closed
376 }
377}
378
379pub(crate) trait HasLedgerMetadata {
380 fn metadata(&self) -> &LedgerMetadata;
381
382 fn ensemble_at(&self, entry_id: EntryId) -> (EntryId, &[BookieId], EntryId) {
383 return self.metadata().ensemble_at(entry_id);
384 }
385
386 fn ensemble_iter(&self, entry_id: EntryId) -> EnsembleIterator<'_> {
387 return self.metadata().ensemble_iter(entry_id);
388 }
389
390 fn last_ensemble(&self) -> &LedgerEnsemble {
391 return self.metadata().last_ensemble();
392 }
393
394 fn closed(&self) -> bool {
395 return self.metadata().closed();
396 }
397}
398
399pub struct BorrowedLedgerMetadata {
400 metadata: Guard<Arc<Versioned<LedgerMetadata>>>,
401 _marker: PhantomData<std::rc::Rc<()>>,
402}
403
404assert_not_impl_any!(BorrowedLedgerMetadata: Send, Sync);
405
406impl std::ops::Deref for BorrowedLedgerMetadata {
407 type Target = Versioned<LedgerMetadata>;
408
409 fn deref(&self) -> &Versioned<LedgerMetadata> {
410 &self.metadata
411 }
412}
413
414impl BorrowedLedgerMetadata {
415 pub fn into_owned(self) -> Arc<Versioned<LedgerMetadata>> {
416 Guard::into_inner(self.metadata)
417 }
418}
419
420#[derive(Clone)]
426pub struct UpdatingLedgerMetadata {
427 lac: Arc<AtomicEntryId>,
428 metadata: Arc<ArcSwap<Versioned<LedgerMetadata>>>,
429}
430
431impl UpdatingLedgerMetadata {
432 pub fn new(metadata: Versioned<LedgerMetadata>) -> Self {
433 let lac = metadata.last_add_confirmed();
434 Self { lac: Arc::new(lac.into()), metadata: Arc::new(ArcSwap::from_pointee(metadata)) }
435 }
436
437 pub fn closed_entry_id(&self) -> Option<EntryId> {
438 let metadata = self.metadata.load();
439 if metadata.closed() {
440 Some(metadata.last_entry_id)
441 } else {
442 None
443 }
444 }
445
446 pub fn check_read(&self, entry_id: EntryId) -> Result<Arc<Versioned<LedgerMetadata>>> {
447 let (lac, metadata) = self.lac_for_read();
448 if entry_id > lac {
449 return Err(BkError::new(ErrorKind::ReadExceedLastAddConfirmed));
450 }
451 Ok(metadata.into_owned())
452 }
453
454 pub fn check_unconfirmed_read(&self, entry_id: EntryId) -> Result<Arc<Versioned<LedgerMetadata>>> {
455 let metadata = self.borrow();
456 if metadata.closed() && entry_id > metadata.last_entry_id {
457 return Err(BkError::new(ErrorKind::ReadExceedLastAddConfirmed));
458 }
459 Ok(metadata.into_owned())
460 }
461
462 fn lac_for_read(&self) -> (EntryId, BorrowedLedgerMetadata) {
463 let metadata = self.borrow();
464 if metadata.closed() {
465 (metadata.last_entry_id, metadata)
466 } else {
467 (metadata.last_add_confirmed().max(self.lac.get()), metadata)
468 }
469 }
470
471 pub fn lac(&self) -> EntryId {
472 let metadata = self.borrow();
473 if metadata.closed() {
474 return metadata.last_entry_id;
475 }
476 metadata.last_add_confirmed().max(self.lac.get())
477 }
478
479 pub fn last_confirmed_meta(&self) -> Result<(EntryId, LedgerLength), Arc<Versioned<LedgerMetadata>>> {
480 let metadata = self.borrow();
481 if metadata.closed() {
482 Ok((metadata.last_entry_id, metadata.length))
483 } else {
484 Err(metadata.into_owned())
485 }
486 }
487
488 pub fn update_lac(&self, entry_id: EntryId) -> EntryId {
489 self.lac.update(entry_id)
490 }
491
492 pub fn read(&self) -> Arc<Versioned<LedgerMetadata>> {
493 self.metadata.load_full()
494 }
495
496 pub fn borrow(&self) -> BorrowedLedgerMetadata {
497 BorrowedLedgerMetadata { metadata: self.metadata.load(), _marker: PhantomData }
498 }
499
500 pub fn update(&mut self, metadata: Versioned<LedgerMetadata>) {
501 self.lac.update(metadata.last_add_confirmed());
502 let mut last_metadata = self.metadata.load();
503 if metadata.version <= last_metadata.version {
504 return;
505 }
506 let metadata = Arc::new(metadata);
507 loop {
508 let current_metadata = self.metadata.compare_and_swap(&last_metadata, metadata.clone());
509 if Arc::ptr_eq(&*current_metadata, &*last_metadata) || metadata.version <= current_metadata.version {
510 return;
511 }
512 last_metadata = current_metadata;
513 }
514 }
515}
516
517pub struct LedgerMetadataUpdater {
518 version: MetaVersion,
519 metadata: UpdatingLedgerMetadata,
520}
521
522impl LedgerMetadataUpdater {
523 pub fn new(metadata: Versioned<LedgerMetadata>) -> Self {
524 let version = metadata.version;
525 Self { version, metadata: UpdatingLedgerMetadata::new(metadata) }
526 }
527
528 pub fn subscribe(&self) -> UpdatingLedgerMetadata {
529 self.metadata.clone()
530 }
531
532 pub fn update(&mut self, metadata: Versioned<LedgerMetadata>) {
533 if self.version >= metadata.version {
534 return;
535 }
536 self.version = metadata.version;
537 self.metadata.update(metadata);
538 }
539}
540
541#[derive(Clone, Debug, PartialEq, Eq, Hash)]
542pub struct BookieId(compact_str::CompactString);
543
544impl BookieId {
545 pub fn new(s: &str) -> BookieId {
546 BookieId(s.into())
547 }
548
549 pub fn as_str(&self) -> &str {
550 self.0.as_str()
551 }
552}
553
554impl Display for BookieId {
555 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), fmt::Error> {
556 Display::fmt(&self.0.as_str(), f)
557 }
558}
559
560impl std::ops::Deref for BookieId {
561 type Target = str;
562
563 fn deref(&self) -> &str {
564 self.as_str()
565 }
566}
567
568impl AsRef<str> for BookieId {
569 fn as_ref(&self) -> &str {
570 self.0.as_str()
571 }
572}
573
574impl Borrow<str> for BookieId {
575 fn borrow(&self) -> &str {
576 self.0.as_str()
577 }
578}