1mod bookie;
2mod entry_distribution;
3pub(crate) mod errors;
4pub(crate) mod local_rc;
5pub(crate) mod metadata;
6mod placement;
7mod reader;
8pub mod service_uri;
9mod writer;
10
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13use std::time::SystemTime;
14
15use tokio::select;
16use tokio::sync::mpsc;
17
18use self::bookie::PoolledClient;
19use self::entry_distribution::EntryDistribution;
20pub use self::errors::{BkError, ErrorKind, Result};
21pub use self::metadata::{
22 BookieId,
23 DigestType,
24 EntryId,
25 LedgerEnsemble,
26 LedgerId,
27 LedgerLength,
28 LedgerMetadata,
29 LedgerState,
30};
31use self::metadata::{LedgerMetadataUpdater, UpdatingLedgerMetadata};
32use self::placement::{EnsembleOptions, PlacementPolicy, RandomPlacementPolicy};
33pub use self::reader::{LacOptions, LedgerReader, PollOptions, ReadOptions};
34use self::service_uri::ServiceUri;
35pub use self::writer::{CloseOptions, LedgerAppender};
36use self::writer::{LedgerWriter, WriteRequest, WriterOptions};
37use super::digest::{self, Algorithm as DigestAlgorithm};
38use super::meta::util::BookieRegistry;
39use super::meta::{
40 EtcdConfiguration,
41 EtcdMetaStore,
42 LedgerMetadataStream,
43 MetaStore,
44 MetaVersion,
45 Versioned,
46 ZkConfiguration,
47 ZkMetaStore,
48};
49use crate::utils::{self, DropOwner, DropWatcher};
50
51#[derive(Clone, Debug)]
53#[non_exhaustive]
54pub struct CreateOptions {
55 ledger_id: Option<LedgerId>,
56 ensemble_size: u32,
57 write_quorum_size: u32,
58 ack_quorum_size: u32,
59 password: Vec<u8>,
60 custom_metadata: HashMap<String, Vec<u8>>,
61 digest_type: DigestType,
62
63 deferred_sync: bool,
64}
65
66impl CreateOptions {
67 fn validate(&self) -> Result<()> {
68 if self.ensemble_size >= self.write_quorum_size && self.write_quorum_size >= self.ack_quorum_size {
69 return Ok(());
70 }
71 let msg = format!(
72 "unfulfilled ensemble requirement: ensemble_size({}) >= write_quorum_size({}) >= ack_quorum_size({})",
73 self.ensemble_size, self.write_quorum_size, self.ack_quorum_size
74 );
75 Err(BkError::with_message(ErrorKind::InvalidMetadata, msg))
76 }
77
78 pub fn new(ensemble_size: usize, write_quorum_size: usize, ack_quorum_size: usize) -> CreateOptions {
80 CreateOptions {
81 ledger_id: None,
82 ensemble_size: ensemble_size as u32,
83 write_quorum_size: write_quorum_size as u32,
84 ack_quorum_size: ack_quorum_size as u32,
85 password: Default::default(),
86 custom_metadata: Default::default(),
87 digest_type: DigestType::DUMMY,
88 deferred_sync: false,
89 }
90 }
91
92 pub fn digest(self, digest_type: DigestType, password: Option<Vec<u8>>) -> Self {
93 CreateOptions { digest_type, password: password.unwrap_or_default(), ..self }
94 }
95
96 pub fn ledger_id(self, ledger_id: LedgerId) -> Self {
97 CreateOptions { ledger_id: Some(ledger_id), ..self }
98 }
99
100 pub fn custom_metadata(self, metadata: HashMap<String, Vec<u8>>) -> Self {
101 CreateOptions { custom_metadata: metadata, ..self }
102 }
103
104 pub fn deferred_sync(self) -> Self {
106 CreateOptions { deferred_sync: true, ..self }
107 }
108}
109
110#[derive(Clone)]
112#[non_exhaustive]
113pub struct OpenOptions<'a> {
114 recovery: bool,
115 password: &'a [u8],
116 digest_type: DigestType,
117 administrative: bool,
118}
119
120impl<'a> OpenOptions<'a> {
121 pub fn new(digest_type: DigestType, password: Option<&'a [u8]>) -> OpenOptions<'a> {
123 OpenOptions { recovery: false, digest_type, password: password.unwrap_or_default(), administrative: false }
124 }
125
126 pub fn recovery(self) -> Self {
128 OpenOptions { recovery: true, ..self }
129 }
130
131 pub fn administrative(self) -> Self {
133 OpenOptions { administrative: true, ..self }
134 }
135}
136
137#[derive(Clone, Debug, Default)]
139#[non_exhaustive]
140pub struct DeleteOptions {}
141
142#[derive(Clone)]
144#[non_exhaustive]
145pub struct Configuration {
146 service_uri: String,
147 bookies: Option<String>,
148}
149
150impl Configuration {
151 pub fn new(service_uri: String) -> Configuration {
153 Configuration { service_uri, bookies: None }
154 }
155
156 pub fn bookies(self, bookies: String) -> Self {
158 Configuration { bookies: Some(bookies), ..self }
159 }
160}
161
162#[derive(Clone)]
164pub struct BookKeeper {
165 meta_store: Arc<dyn MetaStore>,
166 bookie_client: Arc<PoolledClient>,
167 placement_policy: Arc<RandomPlacementPolicy>,
168}
169
170async fn relay_metadata_stream(
171 mut updater: LedgerMetadataUpdater,
172 mut stream: Box<dyn LedgerMetadataStream>,
173 mut drop_watcher: DropWatcher,
174) {
175 loop {
176 select! {
177 _ = drop_watcher.dropped() => break,
178 r = stream.next() => match r {
179 Err(_) => continue,
180 Ok(metadata) => updater.update(metadata),
181 },
182 }
183 }
184 stream.cancel().await;
185}
186
187async fn merge_metadata_stream_and_updates(
188 mut updater: LedgerMetadataUpdater,
189 mut stream: Box<dyn LedgerMetadataStream>,
190 mut updates: mpsc::Receiver<Versioned<LedgerMetadata>>,
191 mut drop_watcher: DropWatcher,
192) {
193 loop {
194 select! {
195 _ = drop_watcher.dropped() => break,
196 r = stream.next() => match r {
197 Err(_) => continue,
198 Ok(metadata) => updater.update(metadata),
199 },
200 r = updates.recv() =>
201 match r {
202 Some(metadata) => updater.update(metadata),
203 None => {
204 tokio::spawn(async move {
205 relay_metadata_stream(updater, stream, drop_watcher).await;
206 });
207 return;
208 }
209 } ,
210 }
211 }
212 stream.cancel().await;
213}
214
215fn watch_metadata_stream(
216 metadata: Versioned<LedgerMetadata>,
217 stream: Box<dyn LedgerMetadataStream>,
218) -> (UpdatingLedgerMetadata, DropOwner) {
219 let updater = LedgerMetadataUpdater::new(metadata);
220 let updating = updater.subscribe();
221 let (drop_owner, drop_watcher) = utils::drop_watcher();
222 tokio::spawn(async move { relay_metadata_stream(updater, stream, drop_watcher).await });
223 (updating, drop_owner)
224}
225
226fn watch_metadata_stream_and_updates(
227 metadata: Versioned<LedgerMetadata>,
228 stream: Box<dyn LedgerMetadataStream>,
229 updates: mpsc::Receiver<Versioned<LedgerMetadata>>,
230) -> (UpdatingLedgerMetadata, DropOwner) {
231 let updater = LedgerMetadataUpdater::new(metadata);
232 let updating = updater.subscribe();
233 let (drop_owner, drop_watcher) = utils::drop_watcher();
234 tokio::spawn(async move { merge_metadata_stream_and_updates(updater, stream, updates, drop_watcher).await });
235 (updating, drop_owner)
236}
237
238impl BookKeeper {
239 pub async fn new(config: Configuration) -> Result<BookKeeper> {
241 let service_uri = config.service_uri.parse::<ServiceUri>()?;
242 let bookie_registry = match &config.bookies {
243 None => None,
244 Some(bookie_addresses) => Some(BookieRegistry::with_bookies(bookie_addresses)?),
245 };
246 let (meta_store, bookie_registry): (Arc<dyn MetaStore>, _) = if service_uri.scheme == "etcd" {
247 let endpoints = [service_uri.address.as_str()];
248 let etcd_configuration = EtcdConfiguration::new(service_uri.path);
249 let mut meta_store = EtcdMetaStore::connect(&endpoints, etcd_configuration).await?;
250 let bookie_registry = match bookie_registry {
251 None => BookieRegistry::new(&mut meta_store).await?,
252 Some(bookie_registry) => bookie_registry,
253 };
254 (Arc::new(meta_store), bookie_registry)
255 } else if service_uri.scheme == "zk" {
256 let zk_configuration = ZkConfiguration::from_service_uri(service_uri)?;
257 let mut meta_store = ZkMetaStore::new(zk_configuration).await?;
258 let bookie_registry = match bookie_registry {
259 None => BookieRegistry::new(&mut meta_store).await?,
260 Some(bookie_registry) => bookie_registry,
261 };
262 (Arc::new(meta_store), bookie_registry)
263 } else {
264 let msg = format!("unknown service scheme {}", service_uri.scheme);
265 return Err(BkError::with_message(ErrorKind::InvalidServiceUri, msg));
266 };
267 let placement_policy = RandomPlacementPolicy::new(bookie_registry.clone());
268 let poolled_client = Arc::new(PoolledClient::new(bookie_registry));
269 let bookkeeper =
270 BookKeeper { meta_store, bookie_client: poolled_client, placement_policy: Arc::new(placement_policy) };
271 Ok(bookkeeper)
272 }
273
274 pub async fn open_ledger(&self, ledger_id: LedgerId, options: &OpenOptions<'_>) -> Result<LedgerReader> {
276 let metadata = self.meta_store.read_ledger_metadata(ledger_id).await?;
277 let entry_distribution = EntryDistribution::from_metadata(&metadata);
278 if !options.administrative
279 && (options.digest_type != metadata.digest_type || options.password != metadata.password)
280 {
281 return Err(BkError::new(ErrorKind::UnauthorizedAccess));
282 }
283 let closed = metadata.closed();
284 let needs_recovery = options.recovery && !closed;
285 let digest_algorithm = DigestAlgorithm::new(metadata.digest_type, &metadata.password);
286 let master_key = digest::generate_master_key(&metadata.password);
287 let metadata_stream = self.meta_store.watch_ledger_metadata(ledger_id, metadata.version).await?;
288 let (metadata, drop_owner, metadata_sender) = if needs_recovery {
289 let (metadata_sender, metadata_receiver) = mpsc::channel(128);
290 let (metadata, drop_owner) =
291 watch_metadata_stream_and_updates(metadata, metadata_stream, metadata_receiver);
292 (metadata, drop_owner, Some(metadata_sender))
293 } else {
294 let (metadata, drop_owner) = watch_metadata_stream(metadata, metadata_stream);
295 (metadata, drop_owner, None)
296 };
297 let mut ledger = LedgerReader {
298 ledger_id,
299 metadata,
300 client: self.bookie_client.clone(),
301 entry_distribution,
302 master_key,
303 digest_algorithm,
304 _drop_owner: drop_owner.into(),
305 };
306 if let Some(metadata_sender) = metadata_sender {
307 ledger.recover(metadata_sender, &self.meta_store, self.placement_policy.clone()).await?;
308 }
309 Ok(ledger)
310 }
311
312 pub async fn create_ledger(&self, options: CreateOptions) -> Result<LedgerAppender> {
314 options.validate()?;
315 let ledger_id = if let Some(ledger_id) = options.ledger_id {
316 ledger_id
317 } else {
318 self.meta_store.generate_ledger_id().await?
319 };
320 let ensemble = self.placement_policy.select_ensemble(&EnsembleOptions {
321 ensemble_size: options.ensemble_size,
322 write_quorum: options.write_quorum_size,
323 ack_quorum: options.ack_quorum_size,
324 custom_metadata: &options.custom_metadata,
325 preferred_bookies: &[],
326 excluded_bookies: HashSet::new(),
327 })?;
328 let metadata = LedgerMetadata {
329 ledger_id,
330 length: LedgerLength::ZERO,
331 last_entry_id: EntryId::INVALID,
332 state: LedgerState::Open,
333 password: options.password,
334 ensemble_size: options.ensemble_size,
335 write_quorum_size: options.write_quorum_size,
336 ack_quorum_size: options.ack_quorum_size,
337 ensembles: vec![LedgerEnsemble { first_entry_id: EntryId::MIN, bookies: ensemble }],
338 digest_type: options.digest_type,
339 custom_metadata: options.custom_metadata,
340 format_version: 3,
341 creation_time: Some(SystemTime::now()),
342 creator_token: (rand::random::<usize>() & i64::MAX as usize) as i64,
343 };
344 let version = self.meta_store.create_ledger_metadata(&metadata).await?;
345 let master_key = digest::generate_master_key(&metadata.password);
346 let digest_algorithm = DigestAlgorithm::new(metadata.digest_type, &metadata.password);
347 let writer_options = WriterOptions {
348 deferred_sync: options.deferred_sync,
349 master_key,
350 digest_algorithm: digest_algorithm.clone(),
351 };
352 let entry_distribution = EntryDistribution::from_metadata(&metadata);
353 let (request_sender, metadata, drop_owner) =
354 self.start_ledger_writer(writer_options, version, metadata).await?;
355 Ok(LedgerAppender {
356 reader: LedgerReader {
357 ledger_id,
358 metadata,
359 client: self.bookie_client.clone(),
360 entry_distribution,
361 master_key,
362 digest_algorithm,
363 _drop_owner: drop_owner.into(),
364 },
365 last_add_entry_id: Arc::new(EntryId::INVALID.into()),
366 request_sender,
367 })
368 }
369
370 async fn start_ledger_writer(
371 &self,
372 options: WriterOptions,
373 version: MetaVersion,
374 metadata: LedgerMetadata,
375 ) -> Result<(mpsc::UnboundedSender<WriteRequest>, UpdatingLedgerMetadata, DropOwner)> {
376 let (request_sender, request_receiver) = mpsc::unbounded_channel();
377 let metadata_stream = self.meta_store.watch_ledger_metadata(metadata.ledger_id, version).await?;
378 let (metadata_sender, metadata_receiver) = mpsc::channel(128);
379 let (updating_metadata, drop_owner) = watch_metadata_stream_and_updates(
380 Versioned::new(version, metadata.clone()),
381 metadata_stream,
382 metadata_receiver,
383 );
384 let metadata = updating_metadata.borrow();
385 let writer = LedgerWriter {
386 ledger_id: metadata.ledger_id,
387 client: self.bookie_client.clone(),
388 deferred_sync: options.deferred_sync,
389 entry_distribution: EntryDistribution::from_metadata(&metadata),
390 master_key: options.master_key,
391 digest_algorithm: options.digest_algorithm,
392 meta_store: self.meta_store.clone(),
393 placement_policy: self.placement_policy.clone(),
394 };
395 let metadata = metadata.clone();
396 tokio::spawn(async move {
397 writer.write_state_loop(metadata, EntryId::INVALID, 0i64.into(), request_receiver, metadata_sender).await;
398 });
399 Ok((request_sender, updating_metadata, drop_owner))
400 }
401
402 pub async fn delete_ledger(&self, ledger_id: LedgerId, _options: DeleteOptions) -> Result<()> {
407 self.meta_store.remove_ledger_metadata(ledger_id, None).await?;
408 Ok(())
409 }
410}