1use std::future::Future;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use futures::future::{FusedFuture, FutureExt};
6use static_assertions::assert_impl_all;
7use tokio::select;
8use tokio::sync::{mpsc, oneshot};
9
10use super::bookie::{self, PolledEntry, PoolledClient};
11use super::digest::Algorithm as DigestAlgorithm;
12use super::entry_distribution::{EntryDistribution, HasEntryDistribution};
13use super::errors::{BkError, ErrorKind};
14use super::metadata::{BookieId, EntryId, LedgerId, LedgerLength, LedgerMetadata, LedgerState, UpdatingLedgerMetadata};
15use super::placement::RandomPlacementPolicy;
16use super::writer::{LedgerWriter, WriteRequest};
17use crate::future::SelectAll;
18use crate::meta::{MetaStore, MetaVersion, Versioned};
19use crate::utils::DropOwner;
20
21type Result<T> = std::result::Result<T, BkError>;
22
23#[derive(Default)]
25#[non_exhaustive]
26pub struct ReadOptions {
27 parallel: bool,
28}
29
30impl ReadOptions {
31 pub fn parallel(self) -> Self {
33 ReadOptions { parallel: true, ..self }
34 }
35}
36
37#[derive(Debug)]
39#[non_exhaustive]
40pub struct PollOptions {
41 parallel: bool,
42 timeout: Duration,
43}
44
45impl PollOptions {
46 pub fn new(timeout: Duration) -> PollOptions {
48 PollOptions { parallel: false, timeout }
49 }
50
51 pub fn parallel(self) -> Self {
53 PollOptions { parallel: true, ..self }
54 }
55}
56
57#[derive(Default, Debug)]
59#[non_exhaustive]
60pub struct LacOptions {
61 quorum: bool,
62}
63
64impl LacOptions {
65 pub fn quorum(self) -> Self {
67 LacOptions { quorum: true, ..self }
68 }
69}
70
71#[derive(Clone)]
73pub struct LedgerReader {
74 pub(crate) ledger_id: LedgerId,
75 pub(crate) metadata: UpdatingLedgerMetadata,
76 pub(crate) client: Arc<PoolledClient>,
77 pub(crate) entry_distribution: EntryDistribution,
78 pub(crate) master_key: [u8; 20],
79 pub(crate) digest_algorithm: DigestAlgorithm,
80 pub(crate) _drop_owner: Arc<DropOwner>,
81}
82
83assert_impl_all!(LedgerReader: Send, Sync);
84
85impl std::fmt::Debug for LedgerReader {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 write!(f, "LedgerReader{{ledger_id: {}}}", self.ledger_id)
88 }
89}
90
91impl HasEntryDistribution for LedgerReader {
92 fn entry_distribution(&self) -> &EntryDistribution {
93 &self.entry_distribution
94 }
95}
96
97impl LedgerReader {
98 pub fn id(&self) -> LedgerId {
100 self.ledger_id
101 }
102
103 pub(crate) fn update_metadata(&mut self, metadata: Versioned<LedgerMetadata>) {
104 self.metadata.update(metadata)
105 }
106
107 pub fn last_add_confirmed(&self) -> EntryId {
109 self.metadata.lac()
110 }
111
112 fn update_last_add_confirmed(&self, last_add_confirmed: EntryId) -> EntryId {
113 self.metadata.update_lac(last_add_confirmed)
114 }
115
116 fn read_options(&self, fence: bool) -> bookie::ReadOptions<'_> {
117 bookie::ReadOptions {
118 fence_ledger: fence,
119 high_priority: fence,
120 digest_algorithm: &self.digest_algorithm,
121 master_key: if fence { Some(&self.master_key) } else { None },
122 }
123 }
124
125 async fn poll_sequentially(
126 &self,
127 entry_id: EntryId,
128 bookies: &[BookieId],
129 timeout: Duration,
130 ) -> Result<PolledEntry> {
131 let options = bookie::PollOptions { timeout, digest_algorithm: &self.digest_algorithm };
132 let write_set = self.new_write_set(entry_id);
133 let mut err = None;
134 for i in write_set.iter() {
135 let bookie_id = &bookies[i];
136 let result = self.client.poll_entry(bookie_id, self.id(), entry_id, &options).await;
137 match result {
138 Ok(polled_entry) => return Ok(polled_entry),
139 Err(e) => err = err.or(Some(e)),
140 }
141 }
142 Err(err.unwrap())
143 }
144
145 async fn poll_parallelly(&self, entry_id: EntryId, bookies: &[BookieId], timeout: Duration) -> Result<PolledEntry> {
146 let options = bookie::PollOptions { timeout, digest_algorithm: &self.digest_algorithm };
147 let mut futures = Vec::with_capacity(bookies.len());
148 for bookie_id in bookies {
149 let future = self.client.poll_entry(bookie_id, self.id(), entry_id, &options);
150 futures.push(future.fuse());
151 }
152 let mut select_all = SelectAll::new(&mut futures);
153 let mut err = None;
154 for _ in 0..bookies.len() {
155 let (_, r) = select_all.next().await;
156 match r {
157 Ok(polled_entry) => return Ok(polled_entry),
158 Err(e) => err = err.or(Some(e)),
159 }
160 }
161 Err(err.unwrap())
162 }
163
164 async fn read_sequentially<'a>(
165 &'a self,
166 entry_id: EntryId,
167 fence: bool,
168 ensemble: &'a [BookieId],
169 ) -> Result<bookie::FetchedEntry> {
170 let ensemble = unsafe { std::slice::from_raw_parts(ensemble.as_ptr(), ensemble.len()) };
171 let write_set = self.new_write_set(entry_id);
172 let read_options = self.read_options(fence);
173 let mut err = None;
174 for i in write_set.iter() {
175 let bookie_id = &ensemble[i];
176 let result = self.client.read_entry(bookie_id, self.ledger_id, entry_id, &read_options).await;
177 match result {
178 Ok(fetched_entry) => return Ok(fetched_entry),
179 Err(e) => err = err.or(Some(e)),
180 }
181 }
182 Err(err.unwrap())
183 }
184
185 async fn read_parallelly(
186 &self,
187 entry_id: EntryId,
188 fence: bool,
189 ensemble: &[BookieId],
190 ) -> Result<bookie::FetchedEntry> {
191 let write_set = self.new_write_set(entry_id);
192 let read_options = self.read_options(fence);
193 let mut futures = Vec::with_capacity(ensemble.len());
194 for i in write_set.iter() {
195 let bookie_id = &ensemble[i];
196 let future = self.client.read_entry(bookie_id, self.ledger_id, entry_id, &read_options);
197 futures.push(future.fuse());
198 }
199 let mut select_all = SelectAll::new(&mut futures);
200 let mut err = None;
201 for _ in write_set.iter() {
202 let (_, r) = select_all.next().await;
203 match r {
204 Ok(fetched_entry) => return Ok(fetched_entry),
205 Err(e) => err = err.or(Some(e)),
206 }
207 }
208 Err(err.unwrap())
209 }
210
211 async fn read_entries<'a, F, R>(
212 &'a self,
213 first_entry: EntryId,
214 last_entry: EntryId,
215 metadata: &'a LedgerMetadata,
216 read_fn: F,
217 ) -> Result<Vec<Vec<u8>>>
218 where
219 R: Future<Output = Result<bookie::FetchedEntry>>,
220 F: Fn(EntryId, &'a [BookieId]) -> R, {
221 let n_entries = (last_entry - first_entry) as usize + 1;
222 let mut reading_futures = Vec::with_capacity(n_entries);
223 let mut reading_entry = first_entry;
224 let mut ensemble_iter = metadata.ensemble_iter(first_entry);
225 let (_, mut bookies, mut next_ensemble_entry_id) = unsafe { ensemble_iter.next().unwrap_unchecked() };
226 while reading_entry <= last_entry {
227 if reading_entry == next_ensemble_entry_id {
228 (_, bookies, next_ensemble_entry_id) = unsafe { ensemble_iter.next().unwrap_unchecked() };
229 }
230 reading_futures.push(read_fn(reading_entry, bookies).fuse());
231 reading_entry += 1;
232 }
233 let mut select_all = SelectAll::new(&mut reading_futures);
234 let mut results = Vec::with_capacity(n_entries);
235 results.resize(n_entries, Vec::new());
236 let mut i = 0;
237 while i < n_entries {
238 let (j, r) = select_all.next().await;
239 match r {
240 Err(e) => return Err(e),
241 Ok(r) => results[j] = r.payload,
242 }
243 i += 1;
244 }
245 Ok(results)
246 }
247
248 async fn read_internally(
249 &self,
250 first_entry: EntryId,
251 last_entry: EntryId,
252 metadata: &LedgerMetadata,
253 options: Option<&ReadOptions>,
254 ) -> Result<Vec<Vec<u8>>> {
255 let parallel = options.map(|o| o.parallel).unwrap_or(false);
256 let entries = if parallel {
257 self.read_entries(first_entry, last_entry, metadata, |entry_id, bookies| {
258 self.read_parallelly(entry_id, false, bookies)
259 })
260 .await?
261 } else {
262 self.read_entries(first_entry, last_entry, metadata, |entry_id, bookies| {
263 self.read_sequentially(entry_id, false, bookies)
264 })
265 .await?
266 };
267 Ok(entries)
268 }
269
270 pub async fn read(
272 &self,
273 first_entry: EntryId,
274 last_entry: EntryId,
275 options: Option<&ReadOptions>,
276 ) -> Result<Vec<Vec<u8>>> {
277 assert!(first_entry <= last_entry);
278 assert!(first_entry >= EntryId::MIN);
279 let metadata = self.metadata.check_read(last_entry)?;
280 self.read_internally(first_entry, last_entry, &metadata, options).await
281 }
282
283 pub async fn poll(&self, entry_id: EntryId, options: &PollOptions) -> Result<Vec<u8>> {
288 assert!(entry_id >= EntryId::MIN);
289 let parallel = options.parallel;
290 let mut timeout = options.timeout;
291 let deadline = Instant::now() + timeout;
292 let epsilon = Duration::from_millis(1);
293 loop {
294 let mut last_add_confirmed = self.last_add_confirmed();
295 let metadata = self.metadata.read();
296 let (_, bookies, _) = metadata.ensemble_at(entry_id);
297 if entry_id <= last_add_confirmed {
298 let entry = if parallel {
299 self.read_parallelly(entry_id, false, bookies).await?
300 } else {
301 self.read_sequentially(entry_id, false, bookies).await?
302 };
303 return Ok(entry.payload);
304 }
305 if timeout < epsilon {
306 return Err(BkError::new(ErrorKind::Timeout));
307 }
308 let polled_entry = if parallel {
309 self.poll_parallelly(entry_id, bookies, timeout).await?
310 } else {
311 self.poll_sequentially(entry_id, bookies, timeout).await?
312 };
313 if polled_entry.last_add_confirmed > last_add_confirmed {
314 last_add_confirmed = polled_entry.last_add_confirmed;
315 self.update_last_add_confirmed(last_add_confirmed);
316 }
317 if let Some(payload) = polled_entry.payload {
318 return Ok(payload);
319 } else if entry_id > last_add_confirmed {
320 return Err(BkError::new(ErrorKind::ReadExceedLastAddConfirmed));
321 }
322 timeout = deadline.saturating_duration_since(Instant::now());
323 }
324 }
325
326 async fn cover_quorum<R, T, Fu, Fn>(&self, futures: &mut [Fu], initial: R, mut f: Fn) -> Result<R>
327 where
328 Fu: FusedFuture<Output = Result<T>>,
329 Fn: FnMut(R, T) -> R, {
330 assert_eq!(futures.len(), self.entry_distribution.ensemble_size);
331 let mut acc = initial;
332 let mut err = None;
333 let mut quorum = self.entry_distribution.new_quorum_coverage_set();
334 let mut select_all = SelectAll::new(futures);
335 loop {
336 let (i, r) = select_all.next().await;
337 match r {
338 Err(e) => {
339 if e.kind() == ErrorKind::LedgerNotExisted || e.kind() == ErrorKind::EntryNotExisted {
340 quorum.complete_bookie(i);
341 } else {
342 quorum.fail_bookie(i);
343 err = err.or(Some(e));
344 }
345 },
346 Ok(value) => {
347 acc = f(acc, value);
348 quorum.complete_bookie(i);
349 },
350 };
351 if let Some(covered) = quorum.covered() {
352 if covered {
353 return Ok(acc);
354 }
355 return Err(err.unwrap());
356 }
357 }
358 }
359
360 async fn read_last_confirmed_meta(&self, fence: bool) -> Result<(EntryId, LedgerLength)> {
361 let metadata = match self.metadata.last_confirmed_meta() {
362 Ok(last_confirmed_meta) => return Ok(last_confirmed_meta),
363 Err(metadata) => metadata,
364 };
365 let ensemble = metadata.last_ensemble();
366 let options = bookie::ReadOptions {
367 fence_ledger: fence,
368 high_priority: false,
369 master_key: if fence { Some(&self.master_key) } else { None },
370 digest_algorithm: &self.digest_algorithm,
371 };
372 let mut readings = Vec::with_capacity(ensemble.bookies.len());
373 for bookie_id in ensemble.bookies.iter() {
374 let read = self.client.read_last_entry(bookie_id, self.id(), &options);
375 readings.push(read.fuse());
376 }
377 let last_add_confirmed = self
378 .cover_quorum(
379 &mut readings,
380 ensemble.first_entry_id - 1,
381 |last_add_confirmed, (_, bookie::FetchedEntry { max_lac, .. })| last_add_confirmed.max(max_lac),
382 )
383 .await?;
384 if last_add_confirmed == EntryId::INVALID {
385 return Ok((EntryId::INVALID, 0i64.into()));
386 }
387 let (_, bookies, _) = metadata.ensemble_at(last_add_confirmed);
388 let fetched_entry = self.read_parallelly(last_add_confirmed, false, bookies).await?;
389 Ok((last_add_confirmed, fetched_entry.ledger_length))
390 }
391
392 pub async fn read_last_add_confirmed(&self, options: &LacOptions) -> Result<EntryId> {
394 if let Some(last_entry_id) = self.metadata.closed_entry_id() {
395 return Ok(last_entry_id);
396 }
397 let metadata = self.metadata.read();
398 let ensemble = metadata.last_ensemble();
399 let mut readings = Vec::with_capacity(ensemble.bookies.len());
400 for bookie_id in ensemble.bookies.iter() {
401 let read = self.client.read_lac(bookie_id, self.id(), &self.digest_algorithm);
402 readings.push(read.fuse());
403 }
404 let last_add_confirmed = self.last_add_confirmed();
405 if !options.quorum {
406 let mut select_all = SelectAll::new(&mut readings);
407 let mut err = None;
408 loop {
409 select! {
410 (_, r) = select_all.next() => {
411 match r {
412 Err(e) => err = err.or(Some(e)),
413 Ok(entry_id) if entry_id > last_add_confirmed => {
414 return Ok(self.update_last_add_confirmed(entry_id));
415 },
416 _ => {},
417 };
418 },
419 }
420 if select_all.is_terminated() {
421 if let Some(err) = err {
422 return Err(err);
423 }
424 return Ok(self.last_add_confirmed());
425 }
426 }
427 }
428 let last_add_confirmed = self.cover_quorum(&mut readings, last_add_confirmed, |acc, new| acc.max(new)).await?;
429 Ok(self.update_last_add_confirmed(last_add_confirmed))
430 }
431
432 pub async fn read_unconfirmed(
440 &self,
441 first_entry: EntryId,
442 last_entry: EntryId,
443 options: Option<&ReadOptions>,
444 ) -> Result<Vec<Vec<u8>>> {
445 assert!(first_entry <= last_entry);
446 assert!(first_entry >= EntryId::MIN);
447 let metadata = self.metadata.check_unconfirmed_read(last_entry)?;
448 self.read_internally(first_entry, last_entry, &metadata, options).await
449 }
450
451 async fn recover_open_metadata(
452 &self,
453 metadata: Versioned<LedgerMetadata>,
454 meta_store: &Arc<dyn MetaStore>,
455 ) -> Result<(MetaVersion, LedgerMetadata)> {
456 let Versioned { mut version, value: mut metadata } = metadata;
457 loop {
458 if metadata.state == LedgerState::Closed {
459 return Ok((version, metadata));
460 } else if metadata.state == LedgerState::InRecovery {
461 return Err(BkError::with_description(ErrorKind::LedgerConcurrentClose, &"ledger already in recovery"));
463 }
464 metadata.state = LedgerState::InRecovery;
465 let r = meta_store.write_ledger_metadata(&metadata, version).await?;
466 match r {
467 either::Right(version) => return Ok((version, metadata)),
468 either::Left(Versioned { version: conflicting_version, value: conflicting_metadata }) => {
469 version = conflicting_version;
470 metadata = conflicting_metadata;
471 },
472 }
473 }
474 }
475
476 fn start_recover_writer(
477 &self,
478 metadata: Versioned<LedgerMetadata>,
479 metadata_sender: mpsc::Sender<Versioned<LedgerMetadata>>,
480 meta_store: &Arc<dyn MetaStore>,
481 placement_policy: Arc<RandomPlacementPolicy>,
482 last_confirmed_entry_id: EntryId,
483 last_confirmed_ledger_length: LedgerLength,
484 ) -> mpsc::UnboundedSender<WriteRequest> {
485 let (request_sender, request_receiver) = mpsc::unbounded_channel();
486 let writer = LedgerWriter {
487 ledger_id: metadata.value.ledger_id,
488 client: self.client.clone(),
489 deferred_sync: false,
490 entry_distribution: EntryDistribution::from_metadata(&metadata.value),
491 master_key: self.master_key,
492 digest_algorithm: self.digest_algorithm.clone(),
493 meta_store: meta_store.clone(),
494 placement_policy,
495 };
496 tokio::spawn(async move {
497 writer
498 .write_state_loop(
499 metadata,
500 last_confirmed_entry_id,
501 last_confirmed_ledger_length,
502 request_receiver,
503 metadata_sender,
504 )
505 .await;
506 });
507 request_sender
508 }
509
510 pub(crate) async fn recover(
511 &mut self,
512 metadata_sender: mpsc::Sender<Versioned<LedgerMetadata>>,
513 meta_store: &Arc<dyn MetaStore>,
514 placement_policy: Arc<RandomPlacementPolicy>,
515 ) -> Result<()> {
516 let metadata = self.metadata.read();
517 let (version, metadata) = self.recover_open_metadata(Versioned::clone(&metadata), meta_store).await?;
518 if metadata.closed() {
519 self.update_metadata(Versioned::new(version, metadata));
520 return Ok(());
521 }
522 let (mut last_add_confirmed, ledger_length) = self.read_last_confirmed_meta(true).await?;
523 let request_sender = self.start_recover_writer(
524 Versioned::new(version, metadata.clone()),
525 metadata_sender,
526 meta_store,
527 placement_policy,
528 last_add_confirmed,
529 ledger_length,
530 );
531 let ensemble = metadata.last_ensemble();
532 loop {
533 let entry_id = last_add_confirmed + 1;
534 let payload = match self.read_parallelly(entry_id, true, &ensemble.bookies).await {
535 Err(e) => {
536 let kind = e.kind();
537 if kind == ErrorKind::EntryNotExisted || kind == ErrorKind::LedgerNotExisted {
538 break;
539 }
540 return Err(e);
541 },
542 Ok(fetched_entry) => fetched_entry.payload,
543 };
544 let (sender, receiver) = oneshot::channel();
545 if request_sender.send(WriteRequest::Append { entry_id, payload, responser: sender }).is_err() {
546 let err = BkError::with_description(ErrorKind::UnexpectedError, &"writer closed during recovery");
547 return Err(err);
548 }
549 receiver.await.map_err(|_| {
550 BkError::with_description(ErrorKind::UnexpectedError, &"writer failure during recovery")
551 })??;
552 last_add_confirmed = entry_id;
553 }
554 let (close_sender, close_receiver) = oneshot::channel();
555 request_sender.send(WriteRequest::Close { responser: close_sender }).unwrap();
556 let metadata = close_receiver.await.unwrap()?;
557 self.update_metadata(metadata);
558 Ok(())
559 }
560
561 pub fn closed(&self) -> bool {
563 self.metadata.borrow().closed()
564 }
565}