bookkeeper_client/client/
reader.rs

1use std::future::Future;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use futures::future::{FusedFuture, FutureExt};
6use static_assertions::assert_impl_all;
7use tokio::select;
8use tokio::sync::{mpsc, oneshot};
9
10use super::bookie::{self, PolledEntry, PoolledClient};
11use super::digest::Algorithm as DigestAlgorithm;
12use super::entry_distribution::{EntryDistribution, HasEntryDistribution};
13use super::errors::{BkError, ErrorKind};
14use super::metadata::{BookieId, EntryId, LedgerId, LedgerLength, LedgerMetadata, LedgerState, UpdatingLedgerMetadata};
15use super::placement::RandomPlacementPolicy;
16use super::writer::{LedgerWriter, WriteRequest};
17use crate::future::SelectAll;
18use crate::meta::{MetaStore, MetaVersion, Versioned};
19use crate::utils::DropOwner;
20
21type Result<T> = std::result::Result<T, BkError>;
22
23/// Options to read entries.
24#[derive(Default)]
25#[non_exhaustive]
26pub struct ReadOptions {
27    parallel: bool,
28}
29
30impl ReadOptions {
31    /// Reads entries from bookies parallelly.
32    pub fn parallel(self) -> Self {
33        ReadOptions { parallel: true, ..self }
34    }
35}
36
37/// Options to poll written or about-to-write entry.
38#[derive(Debug)]
39#[non_exhaustive]
40pub struct PollOptions {
41    parallel: bool,
42    timeout: Duration,
43}
44
45impl PollOptions {
46    /// Constructs options for polling entry with given timeout.
47    pub fn new(timeout: Duration) -> PollOptions {
48        PollOptions { parallel: false, timeout }
49    }
50
51    /// Polls entry from write bookies parallelly.
52    pub fn parallel(self) -> Self {
53        PollOptions { parallel: true, ..self }
54    }
55}
56
57/// Options to read last_add_confirmed.
58#[derive(Default, Debug)]
59#[non_exhaustive]
60pub struct LacOptions {
61    quorum: bool,
62}
63
64impl LacOptions {
65    /// Waits reads from quorum of ensemble to consider success.
66    pub fn quorum(self) -> Self {
67        LacOptions { quorum: true, ..self }
68    }
69}
70
71/// Ledger reader.
72#[derive(Clone)]
73pub struct LedgerReader {
74    pub(crate) ledger_id: LedgerId,
75    pub(crate) metadata: UpdatingLedgerMetadata,
76    pub(crate) client: Arc<PoolledClient>,
77    pub(crate) entry_distribution: EntryDistribution,
78    pub(crate) master_key: [u8; 20],
79    pub(crate) digest_algorithm: DigestAlgorithm,
80    pub(crate) _drop_owner: Arc<DropOwner>,
81}
82
83assert_impl_all!(LedgerReader: Send, Sync);
84
85impl std::fmt::Debug for LedgerReader {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        write!(f, "LedgerReader{{ledger_id: {}}}", self.ledger_id)
88    }
89}
90
91impl HasEntryDistribution for LedgerReader {
92    fn entry_distribution(&self) -> &EntryDistribution {
93        &self.entry_distribution
94    }
95}
96
97impl LedgerReader {
98    /// Returns ledger id.
99    pub fn id(&self) -> LedgerId {
100        self.ledger_id
101    }
102
103    pub(crate) fn update_metadata(&mut self, metadata: Versioned<LedgerMetadata>) {
104        self.metadata.update(metadata)
105    }
106
107    /// Gets local cached last_add_confirmed which could vary due to concurrent read and write.
108    pub fn last_add_confirmed(&self) -> EntryId {
109        self.metadata.lac()
110    }
111
112    fn update_last_add_confirmed(&self, last_add_confirmed: EntryId) -> EntryId {
113        self.metadata.update_lac(last_add_confirmed)
114    }
115
116    fn read_options(&self, fence: bool) -> bookie::ReadOptions<'_> {
117        bookie::ReadOptions {
118            fence_ledger: fence,
119            high_priority: fence,
120            digest_algorithm: &self.digest_algorithm,
121            master_key: if fence { Some(&self.master_key) } else { None },
122        }
123    }
124
125    async fn poll_sequentially(
126        &self,
127        entry_id: EntryId,
128        bookies: &[BookieId],
129        timeout: Duration,
130    ) -> Result<PolledEntry> {
131        let options = bookie::PollOptions { timeout, digest_algorithm: &self.digest_algorithm };
132        let write_set = self.new_write_set(entry_id);
133        let mut err = None;
134        for i in write_set.iter() {
135            let bookie_id = &bookies[i];
136            let result = self.client.poll_entry(bookie_id, self.id(), entry_id, &options).await;
137            match result {
138                Ok(polled_entry) => return Ok(polled_entry),
139                Err(e) => err = err.or(Some(e)),
140            }
141        }
142        Err(err.unwrap())
143    }
144
145    async fn poll_parallelly(&self, entry_id: EntryId, bookies: &[BookieId], timeout: Duration) -> Result<PolledEntry> {
146        let options = bookie::PollOptions { timeout, digest_algorithm: &self.digest_algorithm };
147        let mut futures = Vec::with_capacity(bookies.len());
148        for bookie_id in bookies {
149            let future = self.client.poll_entry(bookie_id, self.id(), entry_id, &options);
150            futures.push(future.fuse());
151        }
152        let mut select_all = SelectAll::new(&mut futures);
153        let mut err = None;
154        for _ in 0..bookies.len() {
155            let (_, r) = select_all.next().await;
156            match r {
157                Ok(polled_entry) => return Ok(polled_entry),
158                Err(e) => err = err.or(Some(e)),
159            }
160        }
161        Err(err.unwrap())
162    }
163
164    async fn read_sequentially<'a>(
165        &'a self,
166        entry_id: EntryId,
167        fence: bool,
168        ensemble: &'a [BookieId],
169    ) -> Result<bookie::FetchedEntry> {
170        let ensemble = unsafe { std::slice::from_raw_parts(ensemble.as_ptr(), ensemble.len()) };
171        let write_set = self.new_write_set(entry_id);
172        let read_options = self.read_options(fence);
173        let mut err = None;
174        for i in write_set.iter() {
175            let bookie_id = &ensemble[i];
176            let result = self.client.read_entry(bookie_id, self.ledger_id, entry_id, &read_options).await;
177            match result {
178                Ok(fetched_entry) => return Ok(fetched_entry),
179                Err(e) => err = err.or(Some(e)),
180            }
181        }
182        Err(err.unwrap())
183    }
184
185    async fn read_parallelly(
186        &self,
187        entry_id: EntryId,
188        fence: bool,
189        ensemble: &[BookieId],
190    ) -> Result<bookie::FetchedEntry> {
191        let write_set = self.new_write_set(entry_id);
192        let read_options = self.read_options(fence);
193        let mut futures = Vec::with_capacity(ensemble.len());
194        for i in write_set.iter() {
195            let bookie_id = &ensemble[i];
196            let future = self.client.read_entry(bookie_id, self.ledger_id, entry_id, &read_options);
197            futures.push(future.fuse());
198        }
199        let mut select_all = SelectAll::new(&mut futures);
200        let mut err = None;
201        for _ in write_set.iter() {
202            let (_, r) = select_all.next().await;
203            match r {
204                Ok(fetched_entry) => return Ok(fetched_entry),
205                Err(e) => err = err.or(Some(e)),
206            }
207        }
208        Err(err.unwrap())
209    }
210
211    async fn read_entries<'a, F, R>(
212        &'a self,
213        first_entry: EntryId,
214        last_entry: EntryId,
215        metadata: &'a LedgerMetadata,
216        read_fn: F,
217    ) -> Result<Vec<Vec<u8>>>
218    where
219        R: Future<Output = Result<bookie::FetchedEntry>>,
220        F: Fn(EntryId, &'a [BookieId]) -> R, {
221        let n_entries = (last_entry - first_entry) as usize + 1;
222        let mut reading_futures = Vec::with_capacity(n_entries);
223        let mut reading_entry = first_entry;
224        let mut ensemble_iter = metadata.ensemble_iter(first_entry);
225        let (_, mut bookies, mut next_ensemble_entry_id) = unsafe { ensemble_iter.next().unwrap_unchecked() };
226        while reading_entry <= last_entry {
227            if reading_entry == next_ensemble_entry_id {
228                (_, bookies, next_ensemble_entry_id) = unsafe { ensemble_iter.next().unwrap_unchecked() };
229            }
230            reading_futures.push(read_fn(reading_entry, bookies).fuse());
231            reading_entry += 1;
232        }
233        let mut select_all = SelectAll::new(&mut reading_futures);
234        let mut results = Vec::with_capacity(n_entries);
235        results.resize(n_entries, Vec::new());
236        let mut i = 0;
237        while i < n_entries {
238            let (j, r) = select_all.next().await;
239            match r {
240                Err(e) => return Err(e),
241                Ok(r) => results[j] = r.payload,
242            }
243            i += 1;
244        }
245        Ok(results)
246    }
247
248    async fn read_internally(
249        &self,
250        first_entry: EntryId,
251        last_entry: EntryId,
252        metadata: &LedgerMetadata,
253        options: Option<&ReadOptions>,
254    ) -> Result<Vec<Vec<u8>>> {
255        let parallel = options.map(|o| o.parallel).unwrap_or(false);
256        let entries = if parallel {
257            self.read_entries(first_entry, last_entry, metadata, |entry_id, bookies| {
258                self.read_parallelly(entry_id, false, bookies)
259            })
260            .await?
261        } else {
262            self.read_entries(first_entry, last_entry, metadata, |entry_id, bookies| {
263                self.read_sequentially(entry_id, false, bookies)
264            })
265            .await?
266        };
267        Ok(entries)
268    }
269
270    /// Reads entries from `first_entry` to `last_entry`.
271    pub async fn read(
272        &self,
273        first_entry: EntryId,
274        last_entry: EntryId,
275        options: Option<&ReadOptions>,
276    ) -> Result<Vec<Vec<u8>>> {
277        assert!(first_entry <= last_entry);
278        assert!(first_entry >= EntryId::MIN);
279        let metadata = self.metadata.check_read(last_entry)?;
280        self.read_internally(first_entry, last_entry, &metadata, options).await
281    }
282
283    /// Polls entry with given id.
284    ///
285    /// # Cautions
286    /// * Ledger closing will not interrupt this operation.
287    pub async fn poll(&self, entry_id: EntryId, options: &PollOptions) -> Result<Vec<u8>> {
288        assert!(entry_id >= EntryId::MIN);
289        let parallel = options.parallel;
290        let mut timeout = options.timeout;
291        let deadline = Instant::now() + timeout;
292        let epsilon = Duration::from_millis(1);
293        loop {
294            let mut last_add_confirmed = self.last_add_confirmed();
295            let metadata = self.metadata.read();
296            let (_, bookies, _) = metadata.ensemble_at(entry_id);
297            if entry_id <= last_add_confirmed {
298                let entry = if parallel {
299                    self.read_parallelly(entry_id, false, bookies).await?
300                } else {
301                    self.read_sequentially(entry_id, false, bookies).await?
302                };
303                return Ok(entry.payload);
304            }
305            if timeout < epsilon {
306                return Err(BkError::new(ErrorKind::Timeout));
307            }
308            let polled_entry = if parallel {
309                self.poll_parallelly(entry_id, bookies, timeout).await?
310            } else {
311                self.poll_sequentially(entry_id, bookies, timeout).await?
312            };
313            if polled_entry.last_add_confirmed > last_add_confirmed {
314                last_add_confirmed = polled_entry.last_add_confirmed;
315                self.update_last_add_confirmed(last_add_confirmed);
316            }
317            if let Some(payload) = polled_entry.payload {
318                return Ok(payload);
319            } else if entry_id > last_add_confirmed {
320                return Err(BkError::new(ErrorKind::ReadExceedLastAddConfirmed));
321            }
322            timeout = deadline.saturating_duration_since(Instant::now());
323        }
324    }
325
326    async fn cover_quorum<R, T, Fu, Fn>(&self, futures: &mut [Fu], initial: R, mut f: Fn) -> Result<R>
327    where
328        Fu: FusedFuture<Output = Result<T>>,
329        Fn: FnMut(R, T) -> R, {
330        assert_eq!(futures.len(), self.entry_distribution.ensemble_size);
331        let mut acc = initial;
332        let mut err = None;
333        let mut quorum = self.entry_distribution.new_quorum_coverage_set();
334        let mut select_all = SelectAll::new(futures);
335        loop {
336            let (i, r) = select_all.next().await;
337            match r {
338                Err(e) => {
339                    if e.kind() == ErrorKind::LedgerNotExisted || e.kind() == ErrorKind::EntryNotExisted {
340                        quorum.complete_bookie(i);
341                    } else {
342                        quorum.fail_bookie(i);
343                        err = err.or(Some(e));
344                    }
345                },
346                Ok(value) => {
347                    acc = f(acc, value);
348                    quorum.complete_bookie(i);
349                },
350            };
351            if let Some(covered) = quorum.covered() {
352                if covered {
353                    return Ok(acc);
354                }
355                return Err(err.unwrap());
356            }
357        }
358    }
359
360    async fn read_last_confirmed_meta(&self, fence: bool) -> Result<(EntryId, LedgerLength)> {
361        let metadata = match self.metadata.last_confirmed_meta() {
362            Ok(last_confirmed_meta) => return Ok(last_confirmed_meta),
363            Err(metadata) => metadata,
364        };
365        let ensemble = metadata.last_ensemble();
366        let options = bookie::ReadOptions {
367            fence_ledger: fence,
368            high_priority: false,
369            master_key: if fence { Some(&self.master_key) } else { None },
370            digest_algorithm: &self.digest_algorithm,
371        };
372        let mut readings = Vec::with_capacity(ensemble.bookies.len());
373        for bookie_id in ensemble.bookies.iter() {
374            let read = self.client.read_last_entry(bookie_id, self.id(), &options);
375            readings.push(read.fuse());
376        }
377        let last_add_confirmed = self
378            .cover_quorum(
379                &mut readings,
380                ensemble.first_entry_id - 1,
381                |last_add_confirmed, (_, bookie::FetchedEntry { max_lac, .. })| last_add_confirmed.max(max_lac),
382            )
383            .await?;
384        if last_add_confirmed == EntryId::INVALID {
385            return Ok((EntryId::INVALID, 0i64.into()));
386        }
387        let (_, bookies, _) = metadata.ensemble_at(last_add_confirmed);
388        let fetched_entry = self.read_parallelly(last_add_confirmed, false, bookies).await?;
389        Ok((last_add_confirmed, fetched_entry.ledger_length))
390    }
391
392    /// Reads last_add_confirmed from latest ensemble.
393    pub async fn read_last_add_confirmed(&self, options: &LacOptions) -> Result<EntryId> {
394        if let Some(last_entry_id) = self.metadata.closed_entry_id() {
395            return Ok(last_entry_id);
396        }
397        let metadata = self.metadata.read();
398        let ensemble = metadata.last_ensemble();
399        let mut readings = Vec::with_capacity(ensemble.bookies.len());
400        for bookie_id in ensemble.bookies.iter() {
401            let read = self.client.read_lac(bookie_id, self.id(), &self.digest_algorithm);
402            readings.push(read.fuse());
403        }
404        let last_add_confirmed = self.last_add_confirmed();
405        if !options.quorum {
406            let mut select_all = SelectAll::new(&mut readings);
407            let mut err = None;
408            loop {
409                select! {
410                    (_, r) = select_all.next() => {
411                        match r {
412                            Err(e) => err = err.or(Some(e)),
413                            Ok(entry_id) if entry_id > last_add_confirmed => {
414                                return Ok(self.update_last_add_confirmed(entry_id));
415                            },
416                            _ => {},
417                        };
418                    },
419                }
420                if select_all.is_terminated() {
421                    if let Some(err) = err {
422                        return Err(err);
423                    }
424                    return Ok(self.last_add_confirmed());
425                }
426            }
427        }
428        let last_add_confirmed = self.cover_quorum(&mut readings, last_add_confirmed, |acc, new| acc.max(new)).await?;
429        Ok(self.update_last_add_confirmed(last_add_confirmed))
430    }
431
432    /// Reads entries without checking `last_add_confirmed` locally if ledger not considered
433    /// closed.
434    ///
435    /// # Notable errors
436    /// * [ErrorKind::ReadExceedLastAddConfirmed] if ledger closed and given entry id exceed last
437    /// add confirmed.
438    /// * [ErrorKind::EntryNotExisted] if given entry does not exists.
439    pub async fn read_unconfirmed(
440        &self,
441        first_entry: EntryId,
442        last_entry: EntryId,
443        options: Option<&ReadOptions>,
444    ) -> Result<Vec<Vec<u8>>> {
445        assert!(first_entry <= last_entry);
446        assert!(first_entry >= EntryId::MIN);
447        let metadata = self.metadata.check_unconfirmed_read(last_entry)?;
448        self.read_internally(first_entry, last_entry, &metadata, options).await
449    }
450
451    async fn recover_open_metadata(
452        &self,
453        metadata: Versioned<LedgerMetadata>,
454        meta_store: &Arc<dyn MetaStore>,
455    ) -> Result<(MetaVersion, LedgerMetadata)> {
456        let Versioned { mut version, value: mut metadata } = metadata;
457        loop {
458            if metadata.state == LedgerState::Closed {
459                return Ok((version, metadata));
460            } else if metadata.state == LedgerState::InRecovery {
461                // Someone is recovering, let it go.
462                return Err(BkError::with_description(ErrorKind::LedgerConcurrentClose, &"ledger already in recovery"));
463            }
464            metadata.state = LedgerState::InRecovery;
465            let r = meta_store.write_ledger_metadata(&metadata, version).await?;
466            match r {
467                either::Right(version) => return Ok((version, metadata)),
468                either::Left(Versioned { version: conflicting_version, value: conflicting_metadata }) => {
469                    version = conflicting_version;
470                    metadata = conflicting_metadata;
471                },
472            }
473        }
474    }
475
476    fn start_recover_writer(
477        &self,
478        metadata: Versioned<LedgerMetadata>,
479        metadata_sender: mpsc::Sender<Versioned<LedgerMetadata>>,
480        meta_store: &Arc<dyn MetaStore>,
481        placement_policy: Arc<RandomPlacementPolicy>,
482        last_confirmed_entry_id: EntryId,
483        last_confirmed_ledger_length: LedgerLength,
484    ) -> mpsc::UnboundedSender<WriteRequest> {
485        let (request_sender, request_receiver) = mpsc::unbounded_channel();
486        let writer = LedgerWriter {
487            ledger_id: metadata.value.ledger_id,
488            client: self.client.clone(),
489            deferred_sync: false,
490            entry_distribution: EntryDistribution::from_metadata(&metadata.value),
491            master_key: self.master_key,
492            digest_algorithm: self.digest_algorithm.clone(),
493            meta_store: meta_store.clone(),
494            placement_policy,
495        };
496        tokio::spawn(async move {
497            writer
498                .write_state_loop(
499                    metadata,
500                    last_confirmed_entry_id,
501                    last_confirmed_ledger_length,
502                    request_receiver,
503                    metadata_sender,
504                )
505                .await;
506        });
507        request_sender
508    }
509
510    pub(crate) async fn recover(
511        &mut self,
512        metadata_sender: mpsc::Sender<Versioned<LedgerMetadata>>,
513        meta_store: &Arc<dyn MetaStore>,
514        placement_policy: Arc<RandomPlacementPolicy>,
515    ) -> Result<()> {
516        let metadata = self.metadata.read();
517        let (version, metadata) = self.recover_open_metadata(Versioned::clone(&metadata), meta_store).await?;
518        if metadata.closed() {
519            self.update_metadata(Versioned::new(version, metadata));
520            return Ok(());
521        }
522        let (mut last_add_confirmed, ledger_length) = self.read_last_confirmed_meta(true).await?;
523        let request_sender = self.start_recover_writer(
524            Versioned::new(version, metadata.clone()),
525            metadata_sender,
526            meta_store,
527            placement_policy,
528            last_add_confirmed,
529            ledger_length,
530        );
531        let ensemble = metadata.last_ensemble();
532        loop {
533            let entry_id = last_add_confirmed + 1;
534            let payload = match self.read_parallelly(entry_id, true, &ensemble.bookies).await {
535                Err(e) => {
536                    let kind = e.kind();
537                    if kind == ErrorKind::EntryNotExisted || kind == ErrorKind::LedgerNotExisted {
538                        break;
539                    }
540                    return Err(e);
541                },
542                Ok(fetched_entry) => fetched_entry.payload,
543            };
544            let (sender, receiver) = oneshot::channel();
545            if request_sender.send(WriteRequest::Append { entry_id, payload, responser: sender }).is_err() {
546                let err = BkError::with_description(ErrorKind::UnexpectedError, &"writer closed during recovery");
547                return Err(err);
548            }
549            receiver.await.map_err(|_| {
550                BkError::with_description(ErrorKind::UnexpectedError, &"writer failure during recovery")
551            })??;
552            last_add_confirmed = entry_id;
553        }
554        let (close_sender, close_receiver) = oneshot::channel();
555        request_sender.send(WriteRequest::Close { responser: close_sender }).unwrap();
556        let metadata = close_receiver.await.unwrap()?;
557        self.update_metadata(metadata);
558        Ok(())
559    }
560
561    /// Returns whether ledger has been closed or not.
562    pub fn closed(&self) -> bool {
563        self.metadata.borrow().closed()
564    }
565}