bookkeeper_client/client/
mod.rs

1mod bookie;
2mod entry_distribution;
3pub(crate) mod errors;
4pub(crate) mod local_rc;
5pub(crate) mod metadata;
6mod placement;
7mod reader;
8pub mod service_uri;
9mod writer;
10
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13use std::time::SystemTime;
14
15use tokio::select;
16use tokio::sync::mpsc;
17
18use self::bookie::PoolledClient;
19use self::entry_distribution::EntryDistribution;
20pub use self::errors::{BkError, ErrorKind, Result};
21pub use self::metadata::{
22    BookieId,
23    DigestType,
24    EntryId,
25    LedgerEnsemble,
26    LedgerId,
27    LedgerLength,
28    LedgerMetadata,
29    LedgerState,
30};
31use self::metadata::{LedgerMetadataUpdater, UpdatingLedgerMetadata};
32use self::placement::{EnsembleOptions, PlacementPolicy, RandomPlacementPolicy};
33pub use self::reader::{LacOptions, LedgerReader, PollOptions, ReadOptions};
34use self::service_uri::ServiceUri;
35pub use self::writer::{CloseOptions, LedgerAppender};
36use self::writer::{LedgerWriter, WriteRequest, WriterOptions};
37use super::digest::{self, Algorithm as DigestAlgorithm};
38use super::meta::util::BookieRegistry;
39use super::meta::{
40    EtcdConfiguration,
41    EtcdMetaStore,
42    LedgerMetadataStream,
43    MetaStore,
44    MetaVersion,
45    Versioned,
46    ZkConfiguration,
47    ZkMetaStore,
48};
49use crate::utils::{self, DropOwner, DropWatcher};
50
51/// Options to create ledger.
52#[derive(Clone, Debug)]
53#[non_exhaustive]
54pub struct CreateOptions {
55    ledger_id: Option<LedgerId>,
56    ensemble_size: u32,
57    write_quorum_size: u32,
58    ack_quorum_size: u32,
59    password: Vec<u8>,
60    custom_metadata: HashMap<String, Vec<u8>>,
61    digest_type: DigestType,
62
63    deferred_sync: bool,
64}
65
66impl CreateOptions {
67    fn validate(&self) -> Result<()> {
68        if self.ensemble_size >= self.write_quorum_size && self.write_quorum_size >= self.ack_quorum_size {
69            return Ok(());
70        }
71        let msg = format!(
72            "unfulfilled ensemble requirement: ensemble_size({}) >= write_quorum_size({}) >= ack_quorum_size({})",
73            self.ensemble_size, self.write_quorum_size, self.ack_quorum_size
74        );
75        Err(BkError::with_message(ErrorKind::InvalidMetadata, msg))
76    }
77
78    /// Constructs options for ledger creation.
79    pub fn new(ensemble_size: usize, write_quorum_size: usize, ack_quorum_size: usize) -> CreateOptions {
80        CreateOptions {
81            ledger_id: None,
82            ensemble_size: ensemble_size as u32,
83            write_quorum_size: write_quorum_size as u32,
84            ack_quorum_size: ack_quorum_size as u32,
85            password: Default::default(),
86            custom_metadata: Default::default(),
87            digest_type: DigestType::DUMMY,
88            deferred_sync: false,
89        }
90    }
91
92    pub fn digest(self, digest_type: DigestType, password: Option<Vec<u8>>) -> Self {
93        CreateOptions { digest_type, password: password.unwrap_or_default(), ..self }
94    }
95
96    pub fn ledger_id(self, ledger_id: LedgerId) -> Self {
97        CreateOptions { ledger_id: Some(ledger_id), ..self }
98    }
99
100    pub fn custom_metadata(self, metadata: HashMap<String, Vec<u8>>) -> Self {
101        CreateOptions { custom_metadata: metadata, ..self }
102    }
103
104    /// Don't wait for `fsync` after entry written. Use it with caution. See [LedgerAppender::force].
105    pub fn deferred_sync(self) -> Self {
106        CreateOptions { deferred_sync: true, ..self }
107    }
108}
109
110/// Options to open ledger.
111#[derive(Clone)]
112#[non_exhaustive]
113pub struct OpenOptions<'a> {
114    recovery: bool,
115    password: &'a [u8],
116    digest_type: DigestType,
117    administrative: bool,
118}
119
120impl<'a> OpenOptions<'a> {
121    /// Constructs options for opening ledger.
122    pub fn new(digest_type: DigestType, password: Option<&'a [u8]>) -> OpenOptions<'a> {
123        OpenOptions { recovery: false, digest_type, password: password.unwrap_or_default(), administrative: false }
124    }
125
126    /// Recovers(aka. fence and close) possible writing ledger in opening.
127    pub fn recovery(self) -> Self {
128        OpenOptions { recovery: true, ..self }
129    }
130
131    /// Grants adminstrative to bypass digest and password check.
132    pub fn administrative(self) -> Self {
133        OpenOptions { administrative: true, ..self }
134    }
135}
136
137/// Options to delete ledger.
138#[derive(Clone, Debug, Default)]
139#[non_exhaustive]
140pub struct DeleteOptions {}
141
142/// Configuration for BookKeeper client.
143#[derive(Clone)]
144#[non_exhaustive]
145pub struct Configuration {
146    service_uri: String,
147    bookies: Option<String>,
148}
149
150impl Configuration {
151    /// Constructs configuration with given service uri.
152    pub fn new(service_uri: String) -> Configuration {
153        Configuration { service_uri, bookies: None }
154    }
155
156    /// Specifies static bookie cluster.
157    pub fn bookies(self, bookies: String) -> Self {
158        Configuration { bookies: Some(bookies), ..self }
159    }
160}
161
162/// BookKeeper client.
163#[derive(Clone)]
164pub struct BookKeeper {
165    meta_store: Arc<dyn MetaStore>,
166    bookie_client: Arc<PoolledClient>,
167    placement_policy: Arc<RandomPlacementPolicy>,
168}
169
170async fn relay_metadata_stream(
171    mut updater: LedgerMetadataUpdater,
172    mut stream: Box<dyn LedgerMetadataStream>,
173    mut drop_watcher: DropWatcher,
174) {
175    loop {
176        select! {
177            _ = drop_watcher.dropped() => break,
178            r = stream.next() => match r {
179                    Err(_) => continue,
180                    Ok(metadata) => updater.update(metadata),
181            },
182        }
183    }
184    stream.cancel().await;
185}
186
187async fn merge_metadata_stream_and_updates(
188    mut updater: LedgerMetadataUpdater,
189    mut stream: Box<dyn LedgerMetadataStream>,
190    mut updates: mpsc::Receiver<Versioned<LedgerMetadata>>,
191    mut drop_watcher: DropWatcher,
192) {
193    loop {
194        select! {
195            _ = drop_watcher.dropped() => break,
196            r = stream.next() => match r {
197                    Err(_) => continue,
198                    Ok(metadata) => updater.update(metadata),
199            },
200            r = updates.recv() =>
201                match r {
202                    Some(metadata) => updater.update(metadata),
203                    None => {
204                        tokio::spawn(async move {
205                            relay_metadata_stream(updater, stream, drop_watcher).await;
206                        });
207                        return;
208                    }
209                } ,
210        }
211    }
212    stream.cancel().await;
213}
214
215fn watch_metadata_stream(
216    metadata: Versioned<LedgerMetadata>,
217    stream: Box<dyn LedgerMetadataStream>,
218) -> (UpdatingLedgerMetadata, DropOwner) {
219    let updater = LedgerMetadataUpdater::new(metadata);
220    let updating = updater.subscribe();
221    let (drop_owner, drop_watcher) = utils::drop_watcher();
222    tokio::spawn(async move { relay_metadata_stream(updater, stream, drop_watcher).await });
223    (updating, drop_owner)
224}
225
226fn watch_metadata_stream_and_updates(
227    metadata: Versioned<LedgerMetadata>,
228    stream: Box<dyn LedgerMetadataStream>,
229    updates: mpsc::Receiver<Versioned<LedgerMetadata>>,
230) -> (UpdatingLedgerMetadata, DropOwner) {
231    let updater = LedgerMetadataUpdater::new(metadata);
232    let updating = updater.subscribe();
233    let (drop_owner, drop_watcher) = utils::drop_watcher();
234    tokio::spawn(async move { merge_metadata_stream_and_updates(updater, stream, updates, drop_watcher).await });
235    (updating, drop_owner)
236}
237
238impl BookKeeper {
239    /// Constructs BookKeeper client with given configuration.
240    pub async fn new(config: Configuration) -> Result<BookKeeper> {
241        let service_uri = config.service_uri.parse::<ServiceUri>()?;
242        let bookie_registry = match &config.bookies {
243            None => None,
244            Some(bookie_addresses) => Some(BookieRegistry::with_bookies(bookie_addresses)?),
245        };
246        let (meta_store, bookie_registry): (Arc<dyn MetaStore>, _) = if service_uri.scheme == "etcd" {
247            let endpoints = [service_uri.address.as_str()];
248            let etcd_configuration = EtcdConfiguration::new(service_uri.path);
249            let mut meta_store = EtcdMetaStore::connect(&endpoints, etcd_configuration).await?;
250            let bookie_registry = match bookie_registry {
251                None => BookieRegistry::new(&mut meta_store).await?,
252                Some(bookie_registry) => bookie_registry,
253            };
254            (Arc::new(meta_store), bookie_registry)
255        } else if service_uri.scheme == "zk" {
256            let zk_configuration = ZkConfiguration::from_service_uri(service_uri)?;
257            let mut meta_store = ZkMetaStore::new(zk_configuration).await?;
258            let bookie_registry = match bookie_registry {
259                None => BookieRegistry::new(&mut meta_store).await?,
260                Some(bookie_registry) => bookie_registry,
261            };
262            (Arc::new(meta_store), bookie_registry)
263        } else {
264            let msg = format!("unknown service scheme {}", service_uri.scheme);
265            return Err(BkError::with_message(ErrorKind::InvalidServiceUri, msg));
266        };
267        let placement_policy = RandomPlacementPolicy::new(bookie_registry.clone());
268        let poolled_client = Arc::new(PoolledClient::new(bookie_registry));
269        let bookkeeper =
270            BookKeeper { meta_store, bookie_client: poolled_client, placement_policy: Arc::new(placement_policy) };
271        Ok(bookkeeper)
272    }
273
274    /// Opens ledger for reading.
275    pub async fn open_ledger(&self, ledger_id: LedgerId, options: &OpenOptions<'_>) -> Result<LedgerReader> {
276        let metadata = self.meta_store.read_ledger_metadata(ledger_id).await?;
277        let entry_distribution = EntryDistribution::from_metadata(&metadata);
278        if !options.administrative
279            && (options.digest_type != metadata.digest_type || options.password != metadata.password)
280        {
281            return Err(BkError::new(ErrorKind::UnauthorizedAccess));
282        }
283        let closed = metadata.closed();
284        let needs_recovery = options.recovery && !closed;
285        let digest_algorithm = DigestAlgorithm::new(metadata.digest_type, &metadata.password);
286        let master_key = digest::generate_master_key(&metadata.password);
287        let metadata_stream = self.meta_store.watch_ledger_metadata(ledger_id, metadata.version).await?;
288        let (metadata, drop_owner, metadata_sender) = if needs_recovery {
289            let (metadata_sender, metadata_receiver) = mpsc::channel(128);
290            let (metadata, drop_owner) =
291                watch_metadata_stream_and_updates(metadata, metadata_stream, metadata_receiver);
292            (metadata, drop_owner, Some(metadata_sender))
293        } else {
294            let (metadata, drop_owner) = watch_metadata_stream(metadata, metadata_stream);
295            (metadata, drop_owner, None)
296        };
297        let mut ledger = LedgerReader {
298            ledger_id,
299            metadata,
300            client: self.bookie_client.clone(),
301            entry_distribution,
302            master_key,
303            digest_algorithm,
304            _drop_owner: drop_owner.into(),
305        };
306        if let Some(metadata_sender) = metadata_sender {
307            ledger.recover(metadata_sender, &self.meta_store, self.placement_policy.clone()).await?;
308        }
309        Ok(ledger)
310    }
311
312    /// Creates ledger for appending.
313    pub async fn create_ledger(&self, options: CreateOptions) -> Result<LedgerAppender> {
314        options.validate()?;
315        let ledger_id = if let Some(ledger_id) = options.ledger_id {
316            ledger_id
317        } else {
318            self.meta_store.generate_ledger_id().await?
319        };
320        let ensemble = self.placement_policy.select_ensemble(&EnsembleOptions {
321            ensemble_size: options.ensemble_size,
322            write_quorum: options.write_quorum_size,
323            ack_quorum: options.ack_quorum_size,
324            custom_metadata: &options.custom_metadata,
325            preferred_bookies: &[],
326            excluded_bookies: HashSet::new(),
327        })?;
328        let metadata = LedgerMetadata {
329            ledger_id,
330            length: LedgerLength::ZERO,
331            last_entry_id: EntryId::INVALID,
332            state: LedgerState::Open,
333            password: options.password,
334            ensemble_size: options.ensemble_size,
335            write_quorum_size: options.write_quorum_size,
336            ack_quorum_size: options.ack_quorum_size,
337            ensembles: vec![LedgerEnsemble { first_entry_id: EntryId::MIN, bookies: ensemble }],
338            digest_type: options.digest_type,
339            custom_metadata: options.custom_metadata,
340            format_version: 3,
341            creation_time: Some(SystemTime::now()),
342            creator_token: (rand::random::<usize>() & i64::MAX as usize) as i64,
343        };
344        let version = self.meta_store.create_ledger_metadata(&metadata).await?;
345        let master_key = digest::generate_master_key(&metadata.password);
346        let digest_algorithm = DigestAlgorithm::new(metadata.digest_type, &metadata.password);
347        let writer_options = WriterOptions {
348            deferred_sync: options.deferred_sync,
349            master_key,
350            digest_algorithm: digest_algorithm.clone(),
351        };
352        let entry_distribution = EntryDistribution::from_metadata(&metadata);
353        let (request_sender, metadata, drop_owner) =
354            self.start_ledger_writer(writer_options, version, metadata).await?;
355        Ok(LedgerAppender {
356            reader: LedgerReader {
357                ledger_id,
358                metadata,
359                client: self.bookie_client.clone(),
360                entry_distribution,
361                master_key,
362                digest_algorithm,
363                _drop_owner: drop_owner.into(),
364            },
365            last_add_entry_id: Arc::new(EntryId::INVALID.into()),
366            request_sender,
367        })
368    }
369
370    async fn start_ledger_writer(
371        &self,
372        options: WriterOptions,
373        version: MetaVersion,
374        metadata: LedgerMetadata,
375    ) -> Result<(mpsc::UnboundedSender<WriteRequest>, UpdatingLedgerMetadata, DropOwner)> {
376        let (request_sender, request_receiver) = mpsc::unbounded_channel();
377        let metadata_stream = self.meta_store.watch_ledger_metadata(metadata.ledger_id, version).await?;
378        let (metadata_sender, metadata_receiver) = mpsc::channel(128);
379        let (updating_metadata, drop_owner) = watch_metadata_stream_and_updates(
380            Versioned::new(version, metadata.clone()),
381            metadata_stream,
382            metadata_receiver,
383        );
384        let metadata = updating_metadata.borrow();
385        let writer = LedgerWriter {
386            ledger_id: metadata.ledger_id,
387            client: self.bookie_client.clone(),
388            deferred_sync: options.deferred_sync,
389            entry_distribution: EntryDistribution::from_metadata(&metadata),
390            master_key: options.master_key,
391            digest_algorithm: options.digest_algorithm,
392            meta_store: self.meta_store.clone(),
393            placement_policy: self.placement_policy.clone(),
394        };
395        let metadata = metadata.clone();
396        tokio::spawn(async move {
397            writer.write_state_loop(metadata, EntryId::INVALID, 0i64.into(), request_receiver, metadata_sender).await;
398        });
399        Ok((request_sender, updating_metadata, drop_owner))
400    }
401
402    /// Deletes ledger with given id.
403    ///
404    /// # Notable errors
405    /// * [ErrorKind::LedgerNotExisted] if no such ledger.
406    pub async fn delete_ledger(&self, ledger_id: LedgerId, _options: DeleteOptions) -> Result<()> {
407        self.meta_store.remove_ledger_metadata(ledger_id, None).await?;
408        Ok(())
409    }
410}