bookkeeper_client/client/
writer.rs

1use std::cmp::Ordering;
2use std::collections::{HashSet, VecDeque};
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9use futures::future::{Fuse, FusedFuture, FutureExt};
10use ignore_result::Ignore;
11use static_assertions::assert_impl_all;
12use tokio::select;
13use tokio::sync::{mpsc, oneshot};
14use tokio::time::{self, Sleep};
15
16use super::bookie::{self, PoolledClient};
17use super::digest::Algorithm as DigestAlgorithm;
18use super::entry_distribution::{AckSet, EntryDistribution, HasEntryDistribution, WriteSet};
19use super::errors::{BkError, ErrorKind};
20use super::local_rc::LocalRc;
21use super::metadata::{
22    AtomicEntryId,
23    BookieId,
24    EntryId,
25    HasLedgerMetadata,
26    LedgerEnsemble,
27    LedgerId,
28    LedgerLength,
29    LedgerMetadata,
30    LedgerState,
31};
32use super::placement::{EnsembleOptions, PlacementPolicy, RandomPlacementPolicy};
33use super::reader::LedgerReader;
34use crate::future::{SelectAll, SelectIterable};
35use crate::meta::{MetaStore, MetaVersion, Versioned};
36
37type Result<T> = std::result::Result<T, BkError>;
38
39#[derive(Clone, Copy, Debug)]
40pub struct AddOptions {
41    recovery_add: bool,
42    high_priority: bool,
43    last_add_confirmed: EntryId,
44    ledger_length: LedgerLength,
45}
46
47/// Options to close ledgerr.
48#[derive(Default)]
49pub struct CloseOptions {}
50
51pub(crate) struct WriterOptions {
52    pub deferred_sync: bool,
53    pub master_key: [u8; 20],
54    pub digest_algorithm: DigestAlgorithm,
55}
56
57struct AddEntryFuture<F: Future> {
58    entry_id: EntryId,
59    payload: Vec<u8>,
60    recovery: bool,
61    last_add_confirmed: EntryId,
62    ledger_length: LedgerLength,
63    write_set: WriteSet,
64    ack_set: AckSet,
65    write_futures: Vec<Fuse<F>>,
66    responser: Option<oneshot::Sender<Result<AddedEntry>>>,
67}
68
69impl<F: Future> AddEntryFuture<F> {
70    fn to_bookie_index(&self, write_index: usize) -> usize {
71        self.write_set.bookie_index(write_index)
72    }
73
74    fn start_write<'a, 'b, AddEntryFn>(&'a mut self, bookies: &[BookieId], add_entry: &AddEntryFn)
75    where
76        AddEntryFn: Fn(BookieId, EntryId, &'b [u8], AddOptions) -> F, {
77        let options = AddOptions {
78            last_add_confirmed: self.last_add_confirmed,
79            ledger_length: self.ledger_length,
80            recovery_add: self.recovery,
81            high_priority: false,
82        };
83        let payload = unsafe { std::mem::transmute::<&[u8], &'_ [u8]>(self.payload.as_slice()) };
84        for bookie_index in self.write_set.iter() {
85            let bookie_id = bookies[bookie_index].clone();
86            self.write_futures.push(add_entry(bookie_id, self.entry_id, payload, options).fuse());
87        }
88    }
89
90    fn update_write<'a, 'b, AddEntryFn>(&'a mut self, changed: &[bool], bookies: &[BookieId], add_entry: &AddEntryFn)
91    where
92        AddEntryFn: Fn(BookieId, EntryId, &'b [u8], AddOptions) -> F, {
93        let options = AddOptions {
94            last_add_confirmed: self.last_add_confirmed,
95            ledger_length: self.ledger_length,
96            recovery_add: self.recovery,
97            high_priority: false,
98        };
99        let payload = unsafe { std::mem::transmute::<&[u8], &'_ [u8]>(self.payload.as_slice()) };
100        for (write_index, bookie_index) in self.write_set.iter().enumerate() {
101            if !changed[bookie_index] {
102                continue;
103            }
104            let bookie_id = bookies[bookie_index].clone();
105            self.write_futures[write_index] = add_entry(bookie_id, self.entry_id, payload, options).fuse();
106            self.ack_set.unset_write(write_index);
107        }
108    }
109
110    fn terminate_write(&mut self, changed: &[bool]) {
111        for (write_index, bookie_index) in self.write_set.iter().enumerate() {
112            if changed[bookie_index] {
113                self.write_futures[write_index] = Fuse::terminated();
114            }
115        }
116    }
117}
118
119impl<F: Future> Unpin for AddEntryFuture<F> {}
120unsafe impl<F: Future> Send for AddEntryFuture<F> {}
121
122impl<F: Future> Future for AddEntryFuture<F> {
123    type Output = (usize, F::Output);
124
125    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126        let mut iter = SelectIterable::new(&mut self.write_futures);
127        let pinned = unsafe { Pin::new_unchecked(&mut iter) };
128        pinned.poll(cx)
129    }
130}
131
132impl<F: Future> FusedFuture for AddEntryFuture<F> {
133    fn is_terminated(&self) -> bool {
134        return self.write_futures.iter().all(|f| f.is_terminated());
135    }
136}
137
138pub(crate) struct LedgerWriter {
139    pub ledger_id: LedgerId,
140    pub client: Arc<PoolledClient>,
141    pub entry_distribution: EntryDistribution,
142    pub placement_policy: Arc<RandomPlacementPolicy>,
143    pub meta_store: Arc<dyn MetaStore>,
144    pub master_key: [u8; 20],
145    pub digest_algorithm: DigestAlgorithm,
146
147    pub deferred_sync: bool,
148}
149
150enum Termination {
151    None,
152    Terminating,
153    Terminated,
154    Error(BkError),
155}
156
157impl Termination {
158    fn next(&mut self) {
159        if matches!(self, Termination::Terminating) {
160            *self = Termination::Terminated;
161        }
162    }
163
164    fn next_with_err(&mut self, err: BkError) {
165        if matches!(self, Termination::Terminating) {
166            *self = Termination::Error(err);
167        }
168    }
169
170    fn terminated(&self) -> bool {
171        matches!(self, Termination::Terminated | Termination::Error(_))
172    }
173
174    fn err(&self) -> Option<&BkError> {
175        match self {
176            Termination::Error(err) => Some(err),
177            _ => None,
178        }
179    }
180}
181
182struct WriterState<
183    'a,
184    AddFuture,
185    EnsembleFuture,
186    ForceFuture,
187    CloseFuture,
188    LacFuture,
189    EntryWriter,
190    LedgerForcer,
191    LedgerCloser,
192    EnsembleChanger,
193    LacFlusher,
194> where
195    AddFuture: Future,
196    EnsembleFuture: Future,
197    ForceFuture: Future,
198    CloseFuture: Future,
199    LacFuture: Future,
200    EntryWriter: Fn(BookieId, EntryId, &'static [u8], AddOptions) -> AddFuture,
201    LedgerForcer: Fn(Vec<BookieId>, EntryId) -> ForceFuture,
202    LedgerCloser: Fn(Versioned<LedgerMetadata>, EntryId, LedgerLength, bool, Vec<LedgerEnsemble>) -> CloseFuture,
203    EnsembleChanger: Fn(Versioned<LedgerMetadata>, EntryId, Vec<bool>) -> EnsembleFuture,
204    LacFlusher: Fn(Vec<BookieId>, EntryId) -> LacFuture, {
205    state: LedgerState,
206    recovery: bool,
207    recovery_ensembles: Vec<LedgerEnsemble>,
208
209    fatal: Option<BkError>,
210    closing: bool,
211    termination: Termination,
212    close_future: Fuse<CloseFuture>,
213    close_waiters: Vec<oneshot::Sender<Result<Versioned<LedgerMetadata>>>>,
214    ledger_closer: LedgerCloser,
215
216    writer: &'a LedgerWriter,
217    metadata: LocalRc<Versioned<LedgerMetadata>>,
218    write_ensemble: LedgerEnsemble,
219    changed_bookies: Box<[bool]>,
220
221    write_quorum_size: usize,
222
223    last_adding_entry_id: EntryId,
224    last_add_entry_id: EntryId,
225    last_add_confirmed: EntryId,
226    last_add_ledger_length: LedgerLength,
227    ledger_length: LedgerLength,
228
229    adding_entries: VecDeque<AddEntryFuture<AddFuture>>,
230    confirmed_entries: VecDeque<AddEntryFuture<AddFuture>>,
231    released_futures: Vec<Vec<Fuse<AddFuture>>>,
232
233    force_future: Fuse<ForceFuture>,
234    force_entry_id: EntryId,
235    pending_forces: VecDeque<PendingForce>,
236    ledger_forcer: LedgerForcer,
237
238    failed_bookies: Vec<bool>,
239    has_failed_bookies: bool,
240    ensemble_changing: Fuse<EnsembleFuture>,
241    ensemble_changer: EnsembleChanger,
242
243    lac_future: Fuse<LacFuture>,
244    lac_timeout: Fuse<Sleep>,
245    lac_flushed: EntryId,
246    lac_flusher: LacFlusher,
247    lac_duration: Duration,
248
249    entry_writer: EntryWriter,
250}
251
252impl<
253        AddFuture,
254        EnsembleFuture,
255        ForceFuture,
256        CloseFuture,
257        LacFuture,
258        EntryWriter,
259        LedgerForcer,
260        LedgerCloser,
261        EnsembleChanger,
262        LacFlusher,
263    >
264    WriterState<
265        '_,
266        AddFuture,
267        EnsembleFuture,
268        ForceFuture,
269        CloseFuture,
270        LacFuture,
271        EntryWriter,
272        LedgerForcer,
273        LedgerCloser,
274        EnsembleChanger,
275        LacFlusher,
276    >
277where
278    AddFuture: Future<Output = Result<()>>,
279    EnsembleFuture: Future<Output = Result<Versioned<LedgerMetadata>>>,
280    ForceFuture: Future<Output = Result<EntryId>>,
281    CloseFuture: Future<Output = Result<Versioned<LedgerMetadata>>>,
282    LacFuture: Future<Output = EntryId>,
283    EntryWriter: Fn(BookieId, EntryId, &'static [u8], AddOptions) -> AddFuture,
284    LedgerForcer: Fn(Vec<BookieId>, EntryId) -> ForceFuture,
285    LedgerCloser: Fn(Versioned<LedgerMetadata>, EntryId, LedgerLength, bool, Vec<LedgerEnsemble>) -> CloseFuture,
286    EnsembleChanger: Fn(Versioned<LedgerMetadata>, EntryId, Vec<bool>) -> EnsembleFuture,
287    LacFlusher: Fn(Vec<BookieId>, EntryId) -> LacFuture,
288{
289    async fn run_once(&mut self) -> Option<Versioned<LedgerMetadata>> {
290        self.check_ensemble();
291        futures::select! {
292            (entry_index, (write_index, r)) = SelectIterable::next(&mut self.adding_entries).fuse() => {
293                self.complete_add(entry_index, write_index, r);
294            },
295            (entry_index, (write_index, r)) = SelectIterable::next(&mut self.confirmed_entries).fuse() => {
296                self.complete_confirmed(entry_index, write_index, r);
297            },
298            r = unsafe {Pin::new_unchecked(&mut self.ensemble_changing)} => {
299                let new_metadata = match r {
300                    Err(err) => {
301                        self.on_fatal_error(err);
302                        return None;
303                    },
304                    Ok(new_metadata) => new_metadata,
305                };
306                self.complete_ensemble_change(new_metadata.clone());
307                return Some(new_metadata);
308            },
309            r = unsafe { Pin::new_unchecked(&mut self.force_future) } => {
310                match r {
311                    Err(err) => self.fail_ledger_force(err),
312                    Ok(last_add_confirmed) => self.complete_ledger_force(last_add_confirmed),
313                }
314            },
315            r = unsafe { Pin::new_unchecked(&mut self.close_future) } => {
316                match r {
317                    Err(err) => self.fail_ledger_close(err),
318                    Ok(metadata) => {
319                        self.complete_ledger_close(metadata.clone());
320                        return Some(metadata);
321                    },
322                }
323            },
324            lac = unsafe { Pin::new_unchecked(&mut self.lac_future) } => {
325                self.lac_flushed = lac;
326            },
327            _ = unsafe { Pin::new_unchecked(&mut self.lac_timeout) } => {
328                self.on_lac_timeout();
329            },
330        }
331        None
332    }
333
334    fn check_ensemble(&mut self) {
335        if !self.has_failed_bookies || self.fatal.is_some() || self.last_add_entry_id != self.last_add_confirmed {
336            return;
337        }
338        if self.recovery {
339            let bookies = match self.writer.select_ensemble(&self.metadata.value, &self.failed_bookies) {
340                Err(_) => {
341                    return;
342                },
343                Ok(bookies) => bookies,
344            };
345            let first_entry_id = self.last_add_confirmed + 1;
346            let ensemble = LedgerEnsemble { first_entry_id, bookies };
347            let replace = self.recovery_ensembles.last().map(|e| e.first_entry_id == first_entry_id).unwrap_or(false);
348            if replace {
349                self.recovery_ensembles.pop();
350            }
351            self.update_write_ensemble(&ensemble.bookies);
352            self.recovery_ensembles.push(ensemble);
353            return;
354        }
355        if self.ensemble_changing.is_terminated() {
356            self.ensemble_changing = (self.ensemble_changer)(
357                self.metadata.as_ref().clone(),
358                self.last_add_confirmed,
359                self.failed_bookies.clone(),
360            )
361            .fuse();
362        }
363    }
364
365    fn closed(&self) -> bool {
366        self.state == LedgerState::Closed
367    }
368
369    fn terminate(&mut self) {
370        self.termination = Termination::Terminating;
371        let (sender, _) = oneshot::channel();
372        self.close_ledger(sender);
373    }
374
375    fn terminated(&self) -> bool {
376        self.closed() || self.termination.terminated()
377    }
378
379    fn close_ledger(&mut self, responser: oneshot::Sender<Result<Versioned<LedgerMetadata>>>) {
380        if self.closed() {
381            let metadata = self.metadata.as_ref().clone();
382            responser.send(Ok(metadata)).ignore();
383            return;
384        } else if self.closing {
385            self.close_waiters.push(responser);
386            return;
387        }
388        self.closing = true;
389        self.close_waiters.push(responser);
390        self.confirmed_entries.clear();
391        self.ensemble_changing = Fuse::terminated();
392        self.has_failed_bookies = false;
393        self.check_pending_close();
394    }
395
396    fn check_pending_close(&mut self) {
397        if self.close_waiters.is_empty() || !self.adding_entries.is_empty() || !self.close_future.is_terminated() {
398            return;
399        }
400        if self.last_add_confirmed < self.last_add_entry_id {
401            self.start_ledger_force();
402            return;
403        }
404        let ensembles = self.recovery_ensembles.clone();
405        self.close_future = (self.ledger_closer)(
406            self.metadata.as_ref().clone(),
407            self.last_add_confirmed,
408            self.last_add_ledger_length,
409            self.recovery,
410            ensembles,
411        )
412        .fuse();
413    }
414
415    fn drain_ledger_close(&mut self, err: BkError) {
416        self.closing = false;
417        self.close_waiters.drain(..).for_each(|responser| {
418            responser.send(Err(err.clone())).ignore();
419        });
420        self.termination.next_with_err(err);
421    }
422
423    fn fail_ledger_close(&mut self, err: BkError) {
424        self.drain_ledger_close(err);
425    }
426
427    fn complete_ledger_close(&mut self, metadata: Versioned<LedgerMetadata>) {
428        self.state = LedgerState::Closed;
429        self.closing = false;
430        self.termination.next();
431        self.metadata = LocalRc::new(metadata.clone());
432        self.close_waiters.drain(..).for_each(|responser| {
433            responser.send(Ok(metadata.clone())).ignore();
434        });
435    }
436
437    fn check_append<R>(&self, responser: oneshot::Sender<Result<R>>) -> Option<oneshot::Sender<Result<R>>> {
438        if self.closed() {
439            responser.send(Err(BkError::new(ErrorKind::LedgerClosed))).ignore();
440            return None;
441        } else if let Some(err) = &self.fatal {
442            responser.send(Err(err.clone())).ignore();
443            return None;
444        } else if self.closing {
445            responser.send(Err(BkError::new(ErrorKind::LedgerClosing))).ignore();
446            return None;
447        }
448        Some(responser)
449    }
450
451    fn append_entry(
452        &mut self,
453        expected_entry_id: EntryId,
454        payload: Vec<u8>,
455        responser: oneshot::Sender<Result<AddedEntry>>,
456    ) {
457        let Some(responser) = self.check_append(responser) else {
458            return;
459        };
460        let entry_id = self.last_adding_entry_id + 1;
461        if expected_entry_id.is_valid() && expected_entry_id != entry_id {
462            let msg = format!("expect next entry id {}, got {}", entry_id, expected_entry_id);
463            let err = BkError::with_message(ErrorKind::UnexpectedError, msg);
464            responser.send(Err(err)).ignore();
465            return;
466        };
467        self.ledger_length += payload.len() as i64;
468        self.last_adding_entry_id = entry_id;
469        let mut add_entry_future = AddEntryFuture {
470            entry_id,
471            payload,
472            recovery: self.recovery,
473            last_add_confirmed: self.last_add_confirmed,
474            ledger_length: self.ledger_length,
475            write_set: self.new_write_set(entry_id),
476            ack_set: self.new_ack_set(),
477            write_futures: self.released_futures.pop().unwrap_or_else(|| Vec::with_capacity(self.write_quorum_size)),
478            responser: Some(responser),
479        };
480        add_entry_future.start_write(&self.write_ensemble.bookies, &self.entry_writer);
481        self.adding_entries.push_back(add_entry_future);
482    }
483
484    fn is_terminal_error(err: ErrorKind) -> bool {
485        matches!(err, ErrorKind::LedgerFenced | ErrorKind::UnauthorizedAccess)
486    }
487
488    fn on_fatal_error(&mut self, err: BkError) {
489        if self.fatal.is_some() {
490            return;
491        }
492        self.abort_write(&err);
493        self.fatal = Some(err);
494    }
495
496    fn abort_write(&mut self, err: &BkError) {
497        while let Some(entry_future) = self.adding_entries.pop_front() {
498            let responser = entry_future.responser.unwrap();
499            responser.send(Err(err.clone())).ignore();
500        }
501        self.confirmed_entries.clear();
502        self.released_futures.clear();
503        self.ensemble_changing = Fuse::terminated();
504    }
505
506    fn complete_add(&mut self, entry_index: usize, write_index: usize, result: Result<()>) {
507        if let Err(err) = result {
508            if Self::is_terminal_error(err.kind()) {
509                self.on_fatal_error(err);
510                return;
511            }
512            let entry_future = &mut self.adding_entries[entry_index];
513            let bookie_index = entry_future.to_bookie_index(write_index);
514            self.failed_bookies[bookie_index] = true;
515            self.has_failed_bookies = true;
516            return;
517        }
518        let entry_future = &mut self.adding_entries[entry_index];
519        let completed = entry_future.ack_set.complete_write(write_index);
520        if entry_index != 0 || !completed || !self.ensemble_changing.is_terminated() {
521            return;
522        }
523        self.confirm_acked_entries();
524    }
525
526    fn complete_confirmed(&mut self, entry_index: usize, write_index: usize, result: Result<()>) {
527        let entry_future = &self.confirmed_entries[entry_index];
528        if result.is_err() {
529            let bookie_index = entry_future.to_bookie_index(write_index);
530            self.failed_bookies[bookie_index] = true;
531            self.has_failed_bookies = true;
532        }
533        if entry_future.is_terminated() {
534            let AddEntryFuture { mut write_futures, .. } =
535                self.confirmed_entries.swap_remove_back(entry_index).unwrap();
536            write_futures.clear();
537            self.released_futures.push(write_futures);
538        }
539    }
540
541    fn confirm_acked_entries(&mut self) {
542        while let Some(entry_future) = self.adding_entries.front() {
543            if !entry_future.ack_set.completed() {
544                break;
545            }
546            let mut entry_future = self.adding_entries.pop_front().unwrap();
547            let responser = entry_future.responser.take().unwrap();
548            let added_entry = AddedEntry {
549                entry_id: entry_future.entry_id,
550                last_add_confirmed: if self.writer.deferred_sync {
551                    self.last_add_confirmed
552                } else {
553                    entry_future.entry_id
554                },
555            };
556            responser.send(Ok(added_entry)).ignore();
557            self.last_add_entry_id = entry_future.entry_id;
558            self.last_add_ledger_length += entry_future.payload.len();
559            if !entry_future.is_terminated() {
560                self.confirmed_entries.push_back(entry_future);
561            } else {
562                entry_future.write_futures.clear();
563                self.released_futures.push(entry_future.write_futures);
564            }
565        }
566        if !self.writer.deferred_sync {
567            self.update_last_add_confirmed(self.last_add_entry_id);
568        }
569        self.check_pending_close();
570    }
571
572    fn update_last_add_confirmed(&mut self, last_add_confirmed: EntryId) {
573        self.last_add_confirmed = last_add_confirmed;
574        if last_add_confirmed > self.lac_flushed && !self.lac_duration.is_zero() && self.lac_timeout.is_terminated() {
575            self.lac_timeout = time::sleep(self.lac_duration).fuse();
576        }
577    }
578
579    fn on_lac_timeout(&mut self) {
580        if self.last_add_confirmed == self.lac_flushed {
581            return;
582        }
583        if self.lac_future.is_terminated() && self.ensemble_changing.is_terminated() {
584            self.lac_future = (self.lac_flusher)(self.write_ensemble.bookies.clone(), self.last_add_confirmed).fuse();
585        }
586        self.lac_timeout = time::sleep(self.lac_duration).fuse();
587    }
588
589    fn force_ledger(&mut self, entry_id: EntryId, responser: oneshot::Sender<Result<EntryId>>) {
590        if self.last_add_confirmed == self.last_add_entry_id || self.last_add_confirmed >= entry_id {
591            responser.send(Ok(self.last_add_confirmed)).ignore();
592            return;
593        }
594        self.pending_forces.push_back(PendingForce { last_add_entry_id: self.last_add_entry_id, responser });
595        self.start_ledger_force();
596    }
597
598    fn force_last_added_entry(&mut self) {
599        self.force_future = (self.ledger_forcer)(self.write_ensemble.bookies.clone(), self.last_add_entry_id).fuse();
600        self.force_entry_id = self.last_add_entry_id;
601    }
602
603    fn start_ledger_force(&mut self) {
604        if self.force_future.is_terminated() {
605            self.force_last_added_entry();
606        }
607    }
608
609    fn fail_ledger_force(&mut self, err: BkError) {
610        self.pending_forces.drain(..).for_each(|force| {
611            force.responser.send(Err(err.clone())).ignore();
612        });
613        if self.adding_entries.is_empty() && self.force_entry_id == self.last_add_entry_id {
614            let err = BkError::new(ErrorKind::LedgerForceFailed).cause_by(err);
615            self.drain_ledger_close(err);
616        }
617    }
618
619    fn complete_ledger_force(&mut self, last_add_confirmed: EntryId) {
620        self.update_last_add_confirmed(last_add_confirmed);
621        while let Some(pending_force) = self.pending_forces.front() {
622            if pending_force.last_add_entry_id > self.last_add_confirmed {
623                self.force_last_added_entry();
624                return;
625            }
626            let pending_force = self.pending_forces.pop_front().unwrap();
627            pending_force.responser.send(Ok(self.last_add_confirmed)).ignore();
628        }
629        self.check_pending_close();
630    }
631
632    fn update_write_ensemble(&mut self, new_bookies: &[BookieId]) {
633        let bookies = &mut self.write_ensemble.bookies;
634        let changed_bookies = self.changed_bookies.as_mut();
635        self.has_failed_bookies = false;
636        for (i, bookie_id) in new_bookies.iter().enumerate() {
637            if bookies[i] == *bookie_id {
638                self.has_failed_bookies |= self.failed_bookies[i];
639            } else {
640                bookies[i] = bookie_id.clone();
641                changed_bookies[i] = true;
642                self.failed_bookies[i] = false;
643            }
644        }
645        self.on_write_ensemble_changed();
646        self.changed_bookies.fill(false);
647    }
648
649    fn on_write_ensemble_changed(&mut self) {
650        let bookies = &self.write_ensemble.bookies;
651        let changed_bookies = self.changed_bookies.as_ref();
652        let mut i = 0usize;
653        while i < self.confirmed_entries.len() {
654            let entry_future = &mut self.confirmed_entries[i];
655            entry_future.terminate_write(changed_bookies);
656            if !entry_future.is_terminated() {
657                i += 1;
658                continue;
659            }
660            let AddEntryFuture { mut write_futures, .. } = self.confirmed_entries.swap_remove_back(i).unwrap();
661            write_futures.clear();
662            self.released_futures.push(write_futures);
663        }
664        for entry_future in self.adding_entries.iter_mut() {
665            entry_future.update_write(changed_bookies, bookies, &self.entry_writer);
666        }
667        self.writer.client.prepare_ensemble(bookies.as_slice());
668        self.confirm_acked_entries();
669    }
670
671    fn complete_ensemble_change(&mut self, new_metadata: Versioned<LedgerMetadata>) {
672        let state = new_metadata.value.state;
673        if state == LedgerState::InRecovery {
674            let err = BkError::new(ErrorKind::LedgerFenced);
675            self.abort_write(&err);
676            self.fatal = Some(err.clone());
677        } else if state == LedgerState::Closed {
678            self.state = LedgerState::Closed;
679            self.abort_write(&BkError::new(ErrorKind::LedgerClosed));
680        } else {
681            let last_ensemble = new_metadata.last_ensemble();
682            self.update_write_ensemble(&last_ensemble.bookies);
683        }
684        self.metadata = LocalRc::new(new_metadata);
685    }
686}
687
688impl<
689        AddFuture,
690        EnsembleFuture,
691        ForceFuture,
692        CloseFuture,
693        LacFuture,
694        EntryWriter,
695        LedgerForcer,
696        LedgerCloser,
697        EnsembleChanger,
698        LacFlusher,
699    > HasEntryDistribution
700    for WriterState<
701        '_,
702        AddFuture,
703        EnsembleFuture,
704        ForceFuture,
705        CloseFuture,
706        LacFuture,
707        EntryWriter,
708        LedgerForcer,
709        LedgerCloser,
710        EnsembleChanger,
711        LacFlusher,
712    >
713where
714    AddFuture: Future,
715    EnsembleFuture: Future,
716    ForceFuture: Future,
717    CloseFuture: Future,
718    LacFuture: Future,
719    EntryWriter: Fn(BookieId, EntryId, &'static [u8], AddOptions) -> AddFuture,
720    LedgerForcer: Fn(Vec<BookieId>, EntryId) -> ForceFuture,
721    LedgerCloser: Fn(Versioned<LedgerMetadata>, EntryId, LedgerLength, bool, Vec<LedgerEnsemble>) -> CloseFuture,
722    EnsembleChanger: Fn(Versioned<LedgerMetadata>, EntryId, Vec<bool>) -> EnsembleFuture,
723    LacFlusher: Fn(Vec<BookieId>, EntryId) -> LacFuture,
724{
725    fn entry_distribution(&self) -> &EntryDistribution {
726        self.writer.entry_distribution()
727    }
728}
729
730impl HasEntryDistribution for LedgerWriter {
731    fn entry_distribution(&self) -> &EntryDistribution {
732        &self.entry_distribution
733    }
734}
735
736impl LedgerWriter {
737    pub(crate) async fn write_state_loop(
738        &self,
739        metadata: Versioned<LedgerMetadata>,
740        last_confirmed_entry_id: EntryId,
741        last_confirmed_ledger_length: LedgerLength,
742        mut request_receiver: mpsc::UnboundedReceiver<WriteRequest>,
743        metadata_sender: mpsc::Sender<Versioned<LedgerMetadata>>,
744    ) {
745        let ensemble_size = metadata.value.ensemble_size as usize;
746        let write_quorum_size = metadata.value.write_quorum_size as usize;
747        let last_ensemble = metadata.last_ensemble().clone();
748        let ledger_state = metadata.value.state;
749        let mut state = WriterState {
750            writer: self,
751            state: ledger_state,
752            recovery: ledger_state == LedgerState::InRecovery,
753            recovery_ensembles: Vec::new(),
754
755            fatal: None,
756            closing: false,
757            termination: Termination::None,
758            close_future: Fuse::terminated(),
759            close_waiters: Vec::with_capacity(5),
760            ledger_closer: |metadata, last_add_confirmed, ledger_length, recovery, ensembles| {
761                self.close_ledger(metadata, last_add_confirmed, ledger_length, recovery, ensembles)
762            },
763
764            metadata: LocalRc::new(metadata),
765            write_ensemble: last_ensemble,
766            changed_bookies: vec![false; ensemble_size].into_boxed_slice(),
767            write_quorum_size,
768
769            adding_entries: VecDeque::new(),
770            confirmed_entries: VecDeque::new(),
771            released_futures: Vec::new(),
772
773            last_adding_entry_id: last_confirmed_entry_id,
774            last_add_entry_id: last_confirmed_entry_id,
775            last_add_confirmed: last_confirmed_entry_id,
776            last_add_ledger_length: last_confirmed_ledger_length,
777            ledger_length: last_confirmed_ledger_length,
778
779            force_future: Fuse::terminated(),
780            force_entry_id: EntryId::INVALID,
781            pending_forces: VecDeque::new(),
782            ledger_forcer: |ensemble, last_add_entry_id| self.force_ledger(ensemble, last_add_entry_id),
783
784            failed_bookies: vec![false; ensemble_size],
785            has_failed_bookies: false,
786            ensemble_changing: Fuse::terminated(),
787
788            lac_future: Fuse::terminated(),
789            lac_timeout: Fuse::terminated(),
790            lac_flushed: last_confirmed_entry_id,
791            lac_flusher: |ensemble, last_add_confirmed| self.write_lac(ensemble, last_add_confirmed),
792            lac_duration: Duration::from_secs(1),
793
794            entry_writer: |bookie_id, entry_id, payload, options| self.add_entry(bookie_id, entry_id, payload, options),
795            ensemble_changer: |metadata: Versioned<LedgerMetadata>, last_add_confirmed, failed_bookies| {
796                self.change_ensemble(metadata.version, metadata.value, last_add_confirmed + 1, failed_bookies)
797            },
798        };
799
800        let mut channel_closed = false;
801        let mut metadata_sending = Fuse::terminated();
802        while !state.terminated() {
803            select! {
804                request = request_receiver.recv(), if !channel_closed => {
805                    let Some(request) = request else {
806                        channel_closed = true;
807                        state.terminate();
808                        continue;
809                    };
810                    match request {
811                        WriteRequest::Append{entry_id, payload, responser} => {
812                            state.append_entry(entry_id, payload, responser);
813                        },
814                        WriteRequest::Force{entry_id, responser} => {
815                            state.force_ledger(entry_id, responser);
816                        },
817                        WriteRequest::Close{responser} => {
818                            state.close_ledger(responser);
819                        },
820                    }
821                },
822                new_metadata = state.run_once() => {
823                    if let Some(metadata) = new_metadata {
824                        metadata_sending = metadata_sender.send(metadata).fuse();
825                    }
826                },
827                _ = unsafe {Pin::new_unchecked(&mut metadata_sending)} => {},
828            }
829        }
830        if let Some(err) = state.termination.err() {
831            log::warn!("ledger {} all writers dropped, but ledger close failed: {:?}", self.ledger_id, err);
832            return;
833        }
834        request_receiver.close();
835        while let Some(request) = request_receiver.recv().await {
836            // We are processing request, so ledger was closed successfully.
837            match request {
838                WriteRequest::Append { responser, .. } => {
839                    responser.send(Err(BkError::new(ErrorKind::LedgerClosed))).ignore()
840                },
841                WriteRequest::Force { responser, .. } => {
842                    responser.send(Err(BkError::new(ErrorKind::LedgerClosed))).ignore()
843                },
844                WriteRequest::Close { responser } => {
845                    responser.send(Err(BkError::new(ErrorKind::LedgerClosed))).ignore()
846                },
847            }
848        }
849    }
850
851    fn select_ensemble(&self, metadata: &LedgerMetadata, failed_bookies: &[bool]) -> Result<Vec<BookieId>> {
852        let ensemble = metadata.last_ensemble();
853        let ensemble_size = ensemble.bookies.len();
854        let mut preferred_bookies = Vec::with_capacity(ensemble_size);
855        let mut excluded_bookies = HashSet::with_capacity(ensemble_size);
856        for (i, bookie_id) in ensemble.bookies.iter().enumerate() {
857            if failed_bookies[i] {
858                excluded_bookies.insert(bookie_id);
859            } else {
860                preferred_bookies.push(bookie_id);
861            }
862        }
863        if excluded_bookies.is_empty() {
864            return Ok(ensemble.bookies.to_vec());
865        }
866        let options = EnsembleOptions {
867            ensemble_size: metadata.ensemble_size,
868            write_quorum: metadata.write_quorum_size,
869            ack_quorum: metadata.ack_quorum_size,
870            custom_metadata: &metadata.custom_metadata,
871            preferred_bookies: &preferred_bookies,
872            excluded_bookies: excluded_bookies.clone(),
873        };
874        let mut selected_bookies = self.placement_policy.select_ensemble(&options)?;
875        let mut ordered_bookies = Vec::with_capacity(ensemble_size);
876        for bookie_id in preferred_bookies.into_iter() {
877            if let Some(j) = selected_bookies.iter().position(|id| id == bookie_id) {
878                ordered_bookies.push(selected_bookies.swap_remove(j));
879            }
880        }
881        for (i, bookie_id) in ensemble.bookies.iter().enumerate() {
882            if !failed_bookies[i] && i < ordered_bookies.len() && *bookie_id == ordered_bookies[i] {
883                continue;
884            }
885            ordered_bookies.insert(i, selected_bookies.swap_remove(0));
886        }
887        Ok(ordered_bookies)
888    }
889
890    async fn change_ensemble(
891        &self,
892        mut version: MetaVersion,
893        mut metadata: LedgerMetadata,
894        first_entry_id: EntryId,
895        failed_bookies: Vec<bool>,
896    ) -> Result<Versioned<LedgerMetadata>> {
897        let mut failed_ensemble_container = None;
898        let original_bookies =
899            unsafe { std::mem::transmute::<&[BookieId], &'_ [BookieId]>(metadata.last_ensemble().bookies.as_slice()) };
900        let ensemble_size = metadata.ensemble_size as usize;
901        let mut excluded_bookies = HashSet::with_capacity(ensemble_size);
902        for (i, bookie_id) in original_bookies.iter().enumerate() {
903            if failed_bookies[i] {
904                excluded_bookies.insert(bookie_id);
905            }
906        }
907        let mut preferred_bookies = Vec::with_capacity(ensemble_size);
908        let mut ordered_bookies = Vec::with_capacity(ensemble_size);
909        loop {
910            if excluded_bookies.is_empty() {
911                return Ok(Versioned::new(version, metadata));
912            }
913            let ensemble = metadata.last_ensemble();
914            if first_entry_id < ensemble.first_entry_id {
915                let err = BkError::with_description(ErrorKind::UnexpectedError, &"ledger ensemble changed");
916                return Err(err);
917            }
918            let preferred_bookies =
919                unsafe { std::mem::transmute::<&mut Vec<&BookieId>, &'_ mut Vec<&BookieId>>(&mut preferred_bookies) };
920            preferred_bookies.extend(ensemble.bookies.iter().filter(|bookie_id| !excluded_bookies.contains(bookie_id)));
921            let options = EnsembleOptions {
922                ensemble_size: metadata.ensemble_size,
923                write_quorum: metadata.write_quorum_size,
924                ack_quorum: metadata.ack_quorum_size,
925                custom_metadata: &metadata.custom_metadata,
926                preferred_bookies: preferred_bookies.as_slice(),
927                excluded_bookies: excluded_bookies.clone(),
928            };
929            let mut selected_bookies = self.placement_policy.select_ensemble(&options)?;
930            for bookie_id in preferred_bookies.iter() {
931                if let Some(i) = selected_bookies.iter().position(|id| id == *bookie_id) {
932                    ordered_bookies.push(selected_bookies.swap_remove(i));
933                }
934            }
935            preferred_bookies.clear();
936            let mut i = 0;
937            while i < ordered_bookies.len() {
938                if ordered_bookies[i] != ensemble.bookies[i] {
939                    ordered_bookies.insert(i, selected_bookies.pop().unwrap());
940                }
941                i += 1;
942            }
943            ordered_bookies.append(&mut selected_bookies);
944
945            if first_entry_id == ensemble.first_entry_id {
946                let replaced_ensemble = metadata.ensembles.pop();
947                failed_ensemble_container = failed_ensemble_container.or(replaced_ensemble);
948            }
949            metadata.ensembles.push(LedgerEnsemble { first_entry_id, bookies: ordered_bookies });
950            match self.meta_store.write_ledger_metadata(&metadata, version).await? {
951                either::Right(new_version) => return Ok(Versioned::new(new_version, metadata)),
952                either::Left(Versioned { version: conflicting_version, value: conflicting_metadata }) => {
953                    ordered_bookies = metadata.ensembles.pop().unwrap().bookies;
954                    ordered_bookies.clear();
955                    version = conflicting_version;
956                    metadata = conflicting_metadata;
957                },
958            }
959            if metadata.state != LedgerState::Open {
960                return Ok(Versioned::new(version, metadata));
961            }
962        }
963    }
964
965    async fn close_ledger(
966        &self,
967        metadata: Versioned<LedgerMetadata>,
968        last_add_confirmed: EntryId,
969        ledger_length: LedgerLength,
970        recovery: bool,
971        ensembles: Vec<LedgerEnsemble>,
972    ) -> Result<Versioned<LedgerMetadata>> {
973        let Versioned { mut version, value: mut metadata } = metadata;
974        loop {
975            if recovery {
976                if metadata.state != LedgerState::InRecovery {
977                    let err = BkError::with_description(ErrorKind::LedgerConcurrentClose, &"ledger not in recovery");
978                    return Err(err);
979                }
980            } else if metadata.state == LedgerState::InRecovery {
981                let err = BkError::with_description(ErrorKind::LedgerConcurrentClose, &"ledger in recovery");
982                return Err(err);
983            } else if metadata.state == LedgerState::Closed {
984                if metadata.last_entry_id == last_add_confirmed && metadata.length == ledger_length {
985                    return Ok(Versioned::new(version, metadata));
986                }
987                let msg = format!(
988                    "attemp to close ledger with (last_entry_id, length) ({}, {}), but got ({}, {})",
989                    last_add_confirmed, ledger_length, metadata.last_entry_id, metadata.length
990                );
991                let err = BkError::with_message(ErrorKind::LedgerConcurrentClose, msg);
992                return Err(err);
993            }
994            if !ensembles.is_empty() {
995                let last_ensemble_entry_id = metadata.last_ensemble().first_entry_id;
996                match last_ensemble_entry_id.cmp(&ensembles[0].first_entry_id) {
997                    Ordering::Greater => {
998                        let err = BkError::with_description(
999                            ErrorKind::UnexpectedError,
1000                            &"ledger ensembles changed in recovery mode",
1001                        );
1002                        return Err(err);
1003                    },
1004                    Ordering::Less => {
1005                        metadata.ensembles.extend_from_slice(&ensembles);
1006                    },
1007                    Ordering::Equal => {
1008                        metadata.ensembles.pop();
1009                    },
1010                }
1011            }
1012            metadata.state = LedgerState::Closed;
1013            metadata.last_entry_id = last_add_confirmed;
1014            metadata.length = ledger_length;
1015            let r = self.meta_store.write_ledger_metadata(&metadata, version).await?;
1016            match r {
1017                either::Right(version) => return Ok(Versioned::new(version, metadata)),
1018                either::Left(Versioned { version: conflicting_version, value: conflicting_metadata }) => {
1019                    version = conflicting_version;
1020                    metadata = conflicting_metadata;
1021                },
1022            }
1023        }
1024    }
1025
1026    async fn write_lac(&self, bookies: Vec<BookieId>, lac: EntryId) -> EntryId {
1027        let options =
1028            bookie::WriteLacOptions { master_key: &self.master_key, digest_algorithm: &self.digest_algorithm };
1029        let n = bookies.len();
1030        let mut futures = Vec::with_capacity(n);
1031        let write_set = self.new_write_set(lac);
1032        for bookie_index in write_set.iter() {
1033            let bookie_id = &bookies[bookie_index];
1034            futures.push(self.client.write_lac(bookie_id, self.ledger_id, lac, &options).fuse());
1035        }
1036        let mut ack_set = self.new_ack_set();
1037        let mut failed = 0;
1038        let mut select_all = SelectAll::new(&mut futures);
1039        let mut err = None;
1040        while failed <= self.entry_distribution.failure_threshold() {
1041            let (write_index, write_result) = select_all.next().await;
1042            if let Err(e) = write_result {
1043                err = err.or(Some(e));
1044                failed += 1;
1045            } else if ack_set.complete_write(write_index) {
1046                return lac;
1047            }
1048        }
1049        lac
1050    }
1051
1052    async fn force_ledger(&self, bookies: Vec<BookieId>, last_add_entry_id: EntryId) -> Result<EntryId> {
1053        let mut futures = Vec::with_capacity(bookies.len());
1054        for bookie_id in bookies.iter() {
1055            futures.push(self.client.force_ledger(bookie_id, self.ledger_id).fuse());
1056        }
1057        let mut select_all = SelectAll::new(&mut futures);
1058        for _ in 0..bookies.len() {
1059            select_all.next().await.1?;
1060        }
1061        Ok(last_add_entry_id)
1062    }
1063
1064    async fn add_entry(
1065        &self,
1066        bookie_id: BookieId,
1067        entry_id: EntryId,
1068        payload: &'static [u8],
1069        options: AddOptions,
1070    ) -> Result<()> {
1071        let adding_entry = bookie::AddingEntry {
1072            ledger_id: self.ledger_id,
1073            entry_id,
1074            last_add_confirmed: options.last_add_confirmed,
1075            accumulated_ledger_length: options.ledger_length,
1076            payload,
1077        };
1078        let add_options = bookie::AddOptions {
1079            recovery_add: options.recovery_add,
1080            high_priority: options.high_priority,
1081            deferred_sync: self.deferred_sync,
1082            master_key: &self.master_key,
1083            digest_algorithm: &self.digest_algorithm,
1084        };
1085        self.client.add_entry(&bookie_id, &adding_entry, &add_options).await?;
1086        Ok(())
1087    }
1088}
1089
1090/// Ledger appender.
1091#[derive(Clone)]
1092pub struct LedgerAppender {
1093    pub(crate) reader: LedgerReader,
1094    pub(crate) last_add_entry_id: Arc<AtomicEntryId>,
1095    pub(crate) request_sender: mpsc::UnboundedSender<WriteRequest>,
1096}
1097
1098assert_impl_all!(LedgerAppender: Send, Sync);
1099
1100#[derive(Debug)]
1101pub struct AddedEntry {
1102    entry_id: EntryId,
1103    last_add_confirmed: EntryId,
1104}
1105
1106#[derive(Debug)]
1107pub enum WriteRequest {
1108    Append { entry_id: EntryId, payload: Vec<u8>, responser: oneshot::Sender<Result<AddedEntry>> },
1109    Force { entry_id: EntryId, responser: oneshot::Sender<Result<EntryId>> },
1110    Close { responser: oneshot::Sender<Result<Versioned<LedgerMetadata>>> },
1111}
1112
1113#[derive(Debug)]
1114struct PendingForce {
1115    last_add_entry_id: EntryId,
1116    responser: oneshot::Sender<Result<EntryId>>,
1117}
1118
1119impl LedgerAppender {
1120    pub fn id(&self) -> LedgerId {
1121        self.reader.ledger_id
1122    }
1123
1124    /// Closes ledger.
1125    pub async fn close(&mut self, _options: CloseOptions) -> Result<()> {
1126        let (sender, receiver) = oneshot::channel();
1127        let request = WriteRequest::Close { responser: sender };
1128        if self.request_sender.send(request).is_err() {
1129            return Err(BkError::new(ErrorKind::LedgerClosed));
1130        }
1131        let metadata = receiver.await.unwrap()?;
1132        self.reader.update_metadata(metadata);
1133        Ok(())
1134    }
1135
1136    /// Constructs a ledger reader.
1137    pub fn reader(&self) -> Result<LedgerReader> {
1138        Ok(self.reader.clone())
1139    }
1140
1141    fn last_add_entry_id(&self) -> EntryId {
1142        self.last_add_entry_id.get()
1143    }
1144
1145    /// Gets local cached last_add_confirmed which could vary due to concurrent write.
1146    pub fn last_add_confirmed(&self) -> EntryId {
1147        self.reader.last_add_confirmed()
1148    }
1149
1150    fn update_last_add_entry_id(&self, entry_id: EntryId) -> EntryId {
1151        self.last_add_entry_id.update(entry_id)
1152    }
1153
1154    fn update_last_add_confirmed(&self, last_add_confirmed: EntryId) -> EntryId {
1155        self.reader.metadata.update_lac(last_add_confirmed)
1156    }
1157
1158    /// Syncs writen entries on last ensemble.
1159    ///
1160    /// ## Errors
1161    /// It is indeterministic whether previously written entries are persisted or not if [ErrorKind::LedgerForceFailed] occurs.
1162    pub async fn force(&self) -> Result<()> {
1163        let last_add_entry_id = self.last_add_entry_id();
1164        if last_add_entry_id <= self.last_add_confirmed() {
1165            return Ok(());
1166        }
1167        let (sender, receiver) = oneshot::channel();
1168        let request = WriteRequest::Force { entry_id: last_add_entry_id, responser: sender };
1169        if self.request_sender.send(request).is_err() {
1170            return Err(BkError::new(ErrorKind::LedgerClosed));
1171        }
1172        let last_add_confirmed = receiver.await.unwrap()?;
1173        self.update_last_add_confirmed(last_add_confirmed);
1174        Ok(())
1175    }
1176
1177    async fn wait<T, E, F>(result: std::result::Result<F, E>) -> std::result::Result<T, E>
1178    where
1179        F: Future<Output = std::result::Result<T, E>>, {
1180        match result {
1181            Err(err) => Err(err),
1182            Ok(future) => future.await,
1183        }
1184    }
1185
1186    /// Appends data to ledger.
1187    pub fn append<'a>(&'a self, data: &'_ [u8]) -> impl Future<Output = Result<EntryId>> + Send + 'a {
1188        Self::wait(self.append_internally(data))
1189    }
1190
1191    fn append_internally<'a>(&'a self, data: &[u8]) -> Result<impl Future<Output = Result<EntryId>> + Send + 'a> {
1192        let (sender, receiver) = oneshot::channel();
1193        let request = WriteRequest::Append { entry_id: EntryId::INVALID, payload: data.to_vec(), responser: sender };
1194        if self.request_sender.send(request).is_err() {
1195            return Err(BkError::new(ErrorKind::LedgerClosed));
1196        }
1197        Ok(async move {
1198            let AddedEntry { entry_id, last_add_confirmed } = receiver.await.unwrap()?;
1199            self.update_last_add_entry_id(entry_id);
1200            self.update_last_add_confirmed(last_add_confirmed);
1201            Ok(entry_id)
1202        })
1203    }
1204}