1use std::cmp::Ordering;
2use std::collections::{HashSet, VecDeque};
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9use futures::future::{Fuse, FusedFuture, FutureExt};
10use ignore_result::Ignore;
11use static_assertions::assert_impl_all;
12use tokio::select;
13use tokio::sync::{mpsc, oneshot};
14use tokio::time::{self, Sleep};
15
16use super::bookie::{self, PoolledClient};
17use super::digest::Algorithm as DigestAlgorithm;
18use super::entry_distribution::{AckSet, EntryDistribution, HasEntryDistribution, WriteSet};
19use super::errors::{BkError, ErrorKind};
20use super::local_rc::LocalRc;
21use super::metadata::{
22 AtomicEntryId,
23 BookieId,
24 EntryId,
25 HasLedgerMetadata,
26 LedgerEnsemble,
27 LedgerId,
28 LedgerLength,
29 LedgerMetadata,
30 LedgerState,
31};
32use super::placement::{EnsembleOptions, PlacementPolicy, RandomPlacementPolicy};
33use super::reader::LedgerReader;
34use crate::future::{SelectAll, SelectIterable};
35use crate::meta::{MetaStore, MetaVersion, Versioned};
36
37type Result<T> = std::result::Result<T, BkError>;
38
39#[derive(Clone, Copy, Debug)]
40pub struct AddOptions {
41 recovery_add: bool,
42 high_priority: bool,
43 last_add_confirmed: EntryId,
44 ledger_length: LedgerLength,
45}
46
47#[derive(Default)]
49pub struct CloseOptions {}
50
51pub(crate) struct WriterOptions {
52 pub deferred_sync: bool,
53 pub master_key: [u8; 20],
54 pub digest_algorithm: DigestAlgorithm,
55}
56
57struct AddEntryFuture<F: Future> {
58 entry_id: EntryId,
59 payload: Vec<u8>,
60 recovery: bool,
61 last_add_confirmed: EntryId,
62 ledger_length: LedgerLength,
63 write_set: WriteSet,
64 ack_set: AckSet,
65 write_futures: Vec<Fuse<F>>,
66 responser: Option<oneshot::Sender<Result<AddedEntry>>>,
67}
68
69impl<F: Future> AddEntryFuture<F> {
70 fn to_bookie_index(&self, write_index: usize) -> usize {
71 self.write_set.bookie_index(write_index)
72 }
73
74 fn start_write<'a, 'b, AddEntryFn>(&'a mut self, bookies: &[BookieId], add_entry: &AddEntryFn)
75 where
76 AddEntryFn: Fn(BookieId, EntryId, &'b [u8], AddOptions) -> F, {
77 let options = AddOptions {
78 last_add_confirmed: self.last_add_confirmed,
79 ledger_length: self.ledger_length,
80 recovery_add: self.recovery,
81 high_priority: false,
82 };
83 let payload = unsafe { std::mem::transmute::<&[u8], &'_ [u8]>(self.payload.as_slice()) };
84 for bookie_index in self.write_set.iter() {
85 let bookie_id = bookies[bookie_index].clone();
86 self.write_futures.push(add_entry(bookie_id, self.entry_id, payload, options).fuse());
87 }
88 }
89
90 fn update_write<'a, 'b, AddEntryFn>(&'a mut self, changed: &[bool], bookies: &[BookieId], add_entry: &AddEntryFn)
91 where
92 AddEntryFn: Fn(BookieId, EntryId, &'b [u8], AddOptions) -> F, {
93 let options = AddOptions {
94 last_add_confirmed: self.last_add_confirmed,
95 ledger_length: self.ledger_length,
96 recovery_add: self.recovery,
97 high_priority: false,
98 };
99 let payload = unsafe { std::mem::transmute::<&[u8], &'_ [u8]>(self.payload.as_slice()) };
100 for (write_index, bookie_index) in self.write_set.iter().enumerate() {
101 if !changed[bookie_index] {
102 continue;
103 }
104 let bookie_id = bookies[bookie_index].clone();
105 self.write_futures[write_index] = add_entry(bookie_id, self.entry_id, payload, options).fuse();
106 self.ack_set.unset_write(write_index);
107 }
108 }
109
110 fn terminate_write(&mut self, changed: &[bool]) {
111 for (write_index, bookie_index) in self.write_set.iter().enumerate() {
112 if changed[bookie_index] {
113 self.write_futures[write_index] = Fuse::terminated();
114 }
115 }
116 }
117}
118
119impl<F: Future> Unpin for AddEntryFuture<F> {}
120unsafe impl<F: Future> Send for AddEntryFuture<F> {}
121
122impl<F: Future> Future for AddEntryFuture<F> {
123 type Output = (usize, F::Output);
124
125 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126 let mut iter = SelectIterable::new(&mut self.write_futures);
127 let pinned = unsafe { Pin::new_unchecked(&mut iter) };
128 pinned.poll(cx)
129 }
130}
131
132impl<F: Future> FusedFuture for AddEntryFuture<F> {
133 fn is_terminated(&self) -> bool {
134 return self.write_futures.iter().all(|f| f.is_terminated());
135 }
136}
137
138pub(crate) struct LedgerWriter {
139 pub ledger_id: LedgerId,
140 pub client: Arc<PoolledClient>,
141 pub entry_distribution: EntryDistribution,
142 pub placement_policy: Arc<RandomPlacementPolicy>,
143 pub meta_store: Arc<dyn MetaStore>,
144 pub master_key: [u8; 20],
145 pub digest_algorithm: DigestAlgorithm,
146
147 pub deferred_sync: bool,
148}
149
150enum Termination {
151 None,
152 Terminating,
153 Terminated,
154 Error(BkError),
155}
156
157impl Termination {
158 fn next(&mut self) {
159 if matches!(self, Termination::Terminating) {
160 *self = Termination::Terminated;
161 }
162 }
163
164 fn next_with_err(&mut self, err: BkError) {
165 if matches!(self, Termination::Terminating) {
166 *self = Termination::Error(err);
167 }
168 }
169
170 fn terminated(&self) -> bool {
171 matches!(self, Termination::Terminated | Termination::Error(_))
172 }
173
174 fn err(&self) -> Option<&BkError> {
175 match self {
176 Termination::Error(err) => Some(err),
177 _ => None,
178 }
179 }
180}
181
182struct WriterState<
183 'a,
184 AddFuture,
185 EnsembleFuture,
186 ForceFuture,
187 CloseFuture,
188 LacFuture,
189 EntryWriter,
190 LedgerForcer,
191 LedgerCloser,
192 EnsembleChanger,
193 LacFlusher,
194> where
195 AddFuture: Future,
196 EnsembleFuture: Future,
197 ForceFuture: Future,
198 CloseFuture: Future,
199 LacFuture: Future,
200 EntryWriter: Fn(BookieId, EntryId, &'static [u8], AddOptions) -> AddFuture,
201 LedgerForcer: Fn(Vec<BookieId>, EntryId) -> ForceFuture,
202 LedgerCloser: Fn(Versioned<LedgerMetadata>, EntryId, LedgerLength, bool, Vec<LedgerEnsemble>) -> CloseFuture,
203 EnsembleChanger: Fn(Versioned<LedgerMetadata>, EntryId, Vec<bool>) -> EnsembleFuture,
204 LacFlusher: Fn(Vec<BookieId>, EntryId) -> LacFuture, {
205 state: LedgerState,
206 recovery: bool,
207 recovery_ensembles: Vec<LedgerEnsemble>,
208
209 fatal: Option<BkError>,
210 closing: bool,
211 termination: Termination,
212 close_future: Fuse<CloseFuture>,
213 close_waiters: Vec<oneshot::Sender<Result<Versioned<LedgerMetadata>>>>,
214 ledger_closer: LedgerCloser,
215
216 writer: &'a LedgerWriter,
217 metadata: LocalRc<Versioned<LedgerMetadata>>,
218 write_ensemble: LedgerEnsemble,
219 changed_bookies: Box<[bool]>,
220
221 write_quorum_size: usize,
222
223 last_adding_entry_id: EntryId,
224 last_add_entry_id: EntryId,
225 last_add_confirmed: EntryId,
226 last_add_ledger_length: LedgerLength,
227 ledger_length: LedgerLength,
228
229 adding_entries: VecDeque<AddEntryFuture<AddFuture>>,
230 confirmed_entries: VecDeque<AddEntryFuture<AddFuture>>,
231 released_futures: Vec<Vec<Fuse<AddFuture>>>,
232
233 force_future: Fuse<ForceFuture>,
234 force_entry_id: EntryId,
235 pending_forces: VecDeque<PendingForce>,
236 ledger_forcer: LedgerForcer,
237
238 failed_bookies: Vec<bool>,
239 has_failed_bookies: bool,
240 ensemble_changing: Fuse<EnsembleFuture>,
241 ensemble_changer: EnsembleChanger,
242
243 lac_future: Fuse<LacFuture>,
244 lac_timeout: Fuse<Sleep>,
245 lac_flushed: EntryId,
246 lac_flusher: LacFlusher,
247 lac_duration: Duration,
248
249 entry_writer: EntryWriter,
250}
251
252impl<
253 AddFuture,
254 EnsembleFuture,
255 ForceFuture,
256 CloseFuture,
257 LacFuture,
258 EntryWriter,
259 LedgerForcer,
260 LedgerCloser,
261 EnsembleChanger,
262 LacFlusher,
263 >
264 WriterState<
265 '_,
266 AddFuture,
267 EnsembleFuture,
268 ForceFuture,
269 CloseFuture,
270 LacFuture,
271 EntryWriter,
272 LedgerForcer,
273 LedgerCloser,
274 EnsembleChanger,
275 LacFlusher,
276 >
277where
278 AddFuture: Future<Output = Result<()>>,
279 EnsembleFuture: Future<Output = Result<Versioned<LedgerMetadata>>>,
280 ForceFuture: Future<Output = Result<EntryId>>,
281 CloseFuture: Future<Output = Result<Versioned<LedgerMetadata>>>,
282 LacFuture: Future<Output = EntryId>,
283 EntryWriter: Fn(BookieId, EntryId, &'static [u8], AddOptions) -> AddFuture,
284 LedgerForcer: Fn(Vec<BookieId>, EntryId) -> ForceFuture,
285 LedgerCloser: Fn(Versioned<LedgerMetadata>, EntryId, LedgerLength, bool, Vec<LedgerEnsemble>) -> CloseFuture,
286 EnsembleChanger: Fn(Versioned<LedgerMetadata>, EntryId, Vec<bool>) -> EnsembleFuture,
287 LacFlusher: Fn(Vec<BookieId>, EntryId) -> LacFuture,
288{
289 async fn run_once(&mut self) -> Option<Versioned<LedgerMetadata>> {
290 self.check_ensemble();
291 futures::select! {
292 (entry_index, (write_index, r)) = SelectIterable::next(&mut self.adding_entries).fuse() => {
293 self.complete_add(entry_index, write_index, r);
294 },
295 (entry_index, (write_index, r)) = SelectIterable::next(&mut self.confirmed_entries).fuse() => {
296 self.complete_confirmed(entry_index, write_index, r);
297 },
298 r = unsafe {Pin::new_unchecked(&mut self.ensemble_changing)} => {
299 let new_metadata = match r {
300 Err(err) => {
301 self.on_fatal_error(err);
302 return None;
303 },
304 Ok(new_metadata) => new_metadata,
305 };
306 self.complete_ensemble_change(new_metadata.clone());
307 return Some(new_metadata);
308 },
309 r = unsafe { Pin::new_unchecked(&mut self.force_future) } => {
310 match r {
311 Err(err) => self.fail_ledger_force(err),
312 Ok(last_add_confirmed) => self.complete_ledger_force(last_add_confirmed),
313 }
314 },
315 r = unsafe { Pin::new_unchecked(&mut self.close_future) } => {
316 match r {
317 Err(err) => self.fail_ledger_close(err),
318 Ok(metadata) => {
319 self.complete_ledger_close(metadata.clone());
320 return Some(metadata);
321 },
322 }
323 },
324 lac = unsafe { Pin::new_unchecked(&mut self.lac_future) } => {
325 self.lac_flushed = lac;
326 },
327 _ = unsafe { Pin::new_unchecked(&mut self.lac_timeout) } => {
328 self.on_lac_timeout();
329 },
330 }
331 None
332 }
333
334 fn check_ensemble(&mut self) {
335 if !self.has_failed_bookies || self.fatal.is_some() || self.last_add_entry_id != self.last_add_confirmed {
336 return;
337 }
338 if self.recovery {
339 let bookies = match self.writer.select_ensemble(&self.metadata.value, &self.failed_bookies) {
340 Err(_) => {
341 return;
342 },
343 Ok(bookies) => bookies,
344 };
345 let first_entry_id = self.last_add_confirmed + 1;
346 let ensemble = LedgerEnsemble { first_entry_id, bookies };
347 let replace = self.recovery_ensembles.last().map(|e| e.first_entry_id == first_entry_id).unwrap_or(false);
348 if replace {
349 self.recovery_ensembles.pop();
350 }
351 self.update_write_ensemble(&ensemble.bookies);
352 self.recovery_ensembles.push(ensemble);
353 return;
354 }
355 if self.ensemble_changing.is_terminated() {
356 self.ensemble_changing = (self.ensemble_changer)(
357 self.metadata.as_ref().clone(),
358 self.last_add_confirmed,
359 self.failed_bookies.clone(),
360 )
361 .fuse();
362 }
363 }
364
365 fn closed(&self) -> bool {
366 self.state == LedgerState::Closed
367 }
368
369 fn terminate(&mut self) {
370 self.termination = Termination::Terminating;
371 let (sender, _) = oneshot::channel();
372 self.close_ledger(sender);
373 }
374
375 fn terminated(&self) -> bool {
376 self.closed() || self.termination.terminated()
377 }
378
379 fn close_ledger(&mut self, responser: oneshot::Sender<Result<Versioned<LedgerMetadata>>>) {
380 if self.closed() {
381 let metadata = self.metadata.as_ref().clone();
382 responser.send(Ok(metadata)).ignore();
383 return;
384 } else if self.closing {
385 self.close_waiters.push(responser);
386 return;
387 }
388 self.closing = true;
389 self.close_waiters.push(responser);
390 self.confirmed_entries.clear();
391 self.ensemble_changing = Fuse::terminated();
392 self.has_failed_bookies = false;
393 self.check_pending_close();
394 }
395
396 fn check_pending_close(&mut self) {
397 if self.close_waiters.is_empty() || !self.adding_entries.is_empty() || !self.close_future.is_terminated() {
398 return;
399 }
400 if self.last_add_confirmed < self.last_add_entry_id {
401 self.start_ledger_force();
402 return;
403 }
404 let ensembles = self.recovery_ensembles.clone();
405 self.close_future = (self.ledger_closer)(
406 self.metadata.as_ref().clone(),
407 self.last_add_confirmed,
408 self.last_add_ledger_length,
409 self.recovery,
410 ensembles,
411 )
412 .fuse();
413 }
414
415 fn drain_ledger_close(&mut self, err: BkError) {
416 self.closing = false;
417 self.close_waiters.drain(..).for_each(|responser| {
418 responser.send(Err(err.clone())).ignore();
419 });
420 self.termination.next_with_err(err);
421 }
422
423 fn fail_ledger_close(&mut self, err: BkError) {
424 self.drain_ledger_close(err);
425 }
426
427 fn complete_ledger_close(&mut self, metadata: Versioned<LedgerMetadata>) {
428 self.state = LedgerState::Closed;
429 self.closing = false;
430 self.termination.next();
431 self.metadata = LocalRc::new(metadata.clone());
432 self.close_waiters.drain(..).for_each(|responser| {
433 responser.send(Ok(metadata.clone())).ignore();
434 });
435 }
436
437 fn check_append<R>(&self, responser: oneshot::Sender<Result<R>>) -> Option<oneshot::Sender<Result<R>>> {
438 if self.closed() {
439 responser.send(Err(BkError::new(ErrorKind::LedgerClosed))).ignore();
440 return None;
441 } else if let Some(err) = &self.fatal {
442 responser.send(Err(err.clone())).ignore();
443 return None;
444 } else if self.closing {
445 responser.send(Err(BkError::new(ErrorKind::LedgerClosing))).ignore();
446 return None;
447 }
448 Some(responser)
449 }
450
451 fn append_entry(
452 &mut self,
453 expected_entry_id: EntryId,
454 payload: Vec<u8>,
455 responser: oneshot::Sender<Result<AddedEntry>>,
456 ) {
457 let Some(responser) = self.check_append(responser) else {
458 return;
459 };
460 let entry_id = self.last_adding_entry_id + 1;
461 if expected_entry_id.is_valid() && expected_entry_id != entry_id {
462 let msg = format!("expect next entry id {}, got {}", entry_id, expected_entry_id);
463 let err = BkError::with_message(ErrorKind::UnexpectedError, msg);
464 responser.send(Err(err)).ignore();
465 return;
466 };
467 self.ledger_length += payload.len() as i64;
468 self.last_adding_entry_id = entry_id;
469 let mut add_entry_future = AddEntryFuture {
470 entry_id,
471 payload,
472 recovery: self.recovery,
473 last_add_confirmed: self.last_add_confirmed,
474 ledger_length: self.ledger_length,
475 write_set: self.new_write_set(entry_id),
476 ack_set: self.new_ack_set(),
477 write_futures: self.released_futures.pop().unwrap_or_else(|| Vec::with_capacity(self.write_quorum_size)),
478 responser: Some(responser),
479 };
480 add_entry_future.start_write(&self.write_ensemble.bookies, &self.entry_writer);
481 self.adding_entries.push_back(add_entry_future);
482 }
483
484 fn is_terminal_error(err: ErrorKind) -> bool {
485 matches!(err, ErrorKind::LedgerFenced | ErrorKind::UnauthorizedAccess)
486 }
487
488 fn on_fatal_error(&mut self, err: BkError) {
489 if self.fatal.is_some() {
490 return;
491 }
492 self.abort_write(&err);
493 self.fatal = Some(err);
494 }
495
496 fn abort_write(&mut self, err: &BkError) {
497 while let Some(entry_future) = self.adding_entries.pop_front() {
498 let responser = entry_future.responser.unwrap();
499 responser.send(Err(err.clone())).ignore();
500 }
501 self.confirmed_entries.clear();
502 self.released_futures.clear();
503 self.ensemble_changing = Fuse::terminated();
504 }
505
506 fn complete_add(&mut self, entry_index: usize, write_index: usize, result: Result<()>) {
507 if let Err(err) = result {
508 if Self::is_terminal_error(err.kind()) {
509 self.on_fatal_error(err);
510 return;
511 }
512 let entry_future = &mut self.adding_entries[entry_index];
513 let bookie_index = entry_future.to_bookie_index(write_index);
514 self.failed_bookies[bookie_index] = true;
515 self.has_failed_bookies = true;
516 return;
517 }
518 let entry_future = &mut self.adding_entries[entry_index];
519 let completed = entry_future.ack_set.complete_write(write_index);
520 if entry_index != 0 || !completed || !self.ensemble_changing.is_terminated() {
521 return;
522 }
523 self.confirm_acked_entries();
524 }
525
526 fn complete_confirmed(&mut self, entry_index: usize, write_index: usize, result: Result<()>) {
527 let entry_future = &self.confirmed_entries[entry_index];
528 if result.is_err() {
529 let bookie_index = entry_future.to_bookie_index(write_index);
530 self.failed_bookies[bookie_index] = true;
531 self.has_failed_bookies = true;
532 }
533 if entry_future.is_terminated() {
534 let AddEntryFuture { mut write_futures, .. } =
535 self.confirmed_entries.swap_remove_back(entry_index).unwrap();
536 write_futures.clear();
537 self.released_futures.push(write_futures);
538 }
539 }
540
541 fn confirm_acked_entries(&mut self) {
542 while let Some(entry_future) = self.adding_entries.front() {
543 if !entry_future.ack_set.completed() {
544 break;
545 }
546 let mut entry_future = self.adding_entries.pop_front().unwrap();
547 let responser = entry_future.responser.take().unwrap();
548 let added_entry = AddedEntry {
549 entry_id: entry_future.entry_id,
550 last_add_confirmed: if self.writer.deferred_sync {
551 self.last_add_confirmed
552 } else {
553 entry_future.entry_id
554 },
555 };
556 responser.send(Ok(added_entry)).ignore();
557 self.last_add_entry_id = entry_future.entry_id;
558 self.last_add_ledger_length += entry_future.payload.len();
559 if !entry_future.is_terminated() {
560 self.confirmed_entries.push_back(entry_future);
561 } else {
562 entry_future.write_futures.clear();
563 self.released_futures.push(entry_future.write_futures);
564 }
565 }
566 if !self.writer.deferred_sync {
567 self.update_last_add_confirmed(self.last_add_entry_id);
568 }
569 self.check_pending_close();
570 }
571
572 fn update_last_add_confirmed(&mut self, last_add_confirmed: EntryId) {
573 self.last_add_confirmed = last_add_confirmed;
574 if last_add_confirmed > self.lac_flushed && !self.lac_duration.is_zero() && self.lac_timeout.is_terminated() {
575 self.lac_timeout = time::sleep(self.lac_duration).fuse();
576 }
577 }
578
579 fn on_lac_timeout(&mut self) {
580 if self.last_add_confirmed == self.lac_flushed {
581 return;
582 }
583 if self.lac_future.is_terminated() && self.ensemble_changing.is_terminated() {
584 self.lac_future = (self.lac_flusher)(self.write_ensemble.bookies.clone(), self.last_add_confirmed).fuse();
585 }
586 self.lac_timeout = time::sleep(self.lac_duration).fuse();
587 }
588
589 fn force_ledger(&mut self, entry_id: EntryId, responser: oneshot::Sender<Result<EntryId>>) {
590 if self.last_add_confirmed == self.last_add_entry_id || self.last_add_confirmed >= entry_id {
591 responser.send(Ok(self.last_add_confirmed)).ignore();
592 return;
593 }
594 self.pending_forces.push_back(PendingForce { last_add_entry_id: self.last_add_entry_id, responser });
595 self.start_ledger_force();
596 }
597
598 fn force_last_added_entry(&mut self) {
599 self.force_future = (self.ledger_forcer)(self.write_ensemble.bookies.clone(), self.last_add_entry_id).fuse();
600 self.force_entry_id = self.last_add_entry_id;
601 }
602
603 fn start_ledger_force(&mut self) {
604 if self.force_future.is_terminated() {
605 self.force_last_added_entry();
606 }
607 }
608
609 fn fail_ledger_force(&mut self, err: BkError) {
610 self.pending_forces.drain(..).for_each(|force| {
611 force.responser.send(Err(err.clone())).ignore();
612 });
613 if self.adding_entries.is_empty() && self.force_entry_id == self.last_add_entry_id {
614 let err = BkError::new(ErrorKind::LedgerForceFailed).cause_by(err);
615 self.drain_ledger_close(err);
616 }
617 }
618
619 fn complete_ledger_force(&mut self, last_add_confirmed: EntryId) {
620 self.update_last_add_confirmed(last_add_confirmed);
621 while let Some(pending_force) = self.pending_forces.front() {
622 if pending_force.last_add_entry_id > self.last_add_confirmed {
623 self.force_last_added_entry();
624 return;
625 }
626 let pending_force = self.pending_forces.pop_front().unwrap();
627 pending_force.responser.send(Ok(self.last_add_confirmed)).ignore();
628 }
629 self.check_pending_close();
630 }
631
632 fn update_write_ensemble(&mut self, new_bookies: &[BookieId]) {
633 let bookies = &mut self.write_ensemble.bookies;
634 let changed_bookies = self.changed_bookies.as_mut();
635 self.has_failed_bookies = false;
636 for (i, bookie_id) in new_bookies.iter().enumerate() {
637 if bookies[i] == *bookie_id {
638 self.has_failed_bookies |= self.failed_bookies[i];
639 } else {
640 bookies[i] = bookie_id.clone();
641 changed_bookies[i] = true;
642 self.failed_bookies[i] = false;
643 }
644 }
645 self.on_write_ensemble_changed();
646 self.changed_bookies.fill(false);
647 }
648
649 fn on_write_ensemble_changed(&mut self) {
650 let bookies = &self.write_ensemble.bookies;
651 let changed_bookies = self.changed_bookies.as_ref();
652 let mut i = 0usize;
653 while i < self.confirmed_entries.len() {
654 let entry_future = &mut self.confirmed_entries[i];
655 entry_future.terminate_write(changed_bookies);
656 if !entry_future.is_terminated() {
657 i += 1;
658 continue;
659 }
660 let AddEntryFuture { mut write_futures, .. } = self.confirmed_entries.swap_remove_back(i).unwrap();
661 write_futures.clear();
662 self.released_futures.push(write_futures);
663 }
664 for entry_future in self.adding_entries.iter_mut() {
665 entry_future.update_write(changed_bookies, bookies, &self.entry_writer);
666 }
667 self.writer.client.prepare_ensemble(bookies.as_slice());
668 self.confirm_acked_entries();
669 }
670
671 fn complete_ensemble_change(&mut self, new_metadata: Versioned<LedgerMetadata>) {
672 let state = new_metadata.value.state;
673 if state == LedgerState::InRecovery {
674 let err = BkError::new(ErrorKind::LedgerFenced);
675 self.abort_write(&err);
676 self.fatal = Some(err.clone());
677 } else if state == LedgerState::Closed {
678 self.state = LedgerState::Closed;
679 self.abort_write(&BkError::new(ErrorKind::LedgerClosed));
680 } else {
681 let last_ensemble = new_metadata.last_ensemble();
682 self.update_write_ensemble(&last_ensemble.bookies);
683 }
684 self.metadata = LocalRc::new(new_metadata);
685 }
686}
687
688impl<
689 AddFuture,
690 EnsembleFuture,
691 ForceFuture,
692 CloseFuture,
693 LacFuture,
694 EntryWriter,
695 LedgerForcer,
696 LedgerCloser,
697 EnsembleChanger,
698 LacFlusher,
699 > HasEntryDistribution
700 for WriterState<
701 '_,
702 AddFuture,
703 EnsembleFuture,
704 ForceFuture,
705 CloseFuture,
706 LacFuture,
707 EntryWriter,
708 LedgerForcer,
709 LedgerCloser,
710 EnsembleChanger,
711 LacFlusher,
712 >
713where
714 AddFuture: Future,
715 EnsembleFuture: Future,
716 ForceFuture: Future,
717 CloseFuture: Future,
718 LacFuture: Future,
719 EntryWriter: Fn(BookieId, EntryId, &'static [u8], AddOptions) -> AddFuture,
720 LedgerForcer: Fn(Vec<BookieId>, EntryId) -> ForceFuture,
721 LedgerCloser: Fn(Versioned<LedgerMetadata>, EntryId, LedgerLength, bool, Vec<LedgerEnsemble>) -> CloseFuture,
722 EnsembleChanger: Fn(Versioned<LedgerMetadata>, EntryId, Vec<bool>) -> EnsembleFuture,
723 LacFlusher: Fn(Vec<BookieId>, EntryId) -> LacFuture,
724{
725 fn entry_distribution(&self) -> &EntryDistribution {
726 self.writer.entry_distribution()
727 }
728}
729
730impl HasEntryDistribution for LedgerWriter {
731 fn entry_distribution(&self) -> &EntryDistribution {
732 &self.entry_distribution
733 }
734}
735
736impl LedgerWriter {
737 pub(crate) async fn write_state_loop(
738 &self,
739 metadata: Versioned<LedgerMetadata>,
740 last_confirmed_entry_id: EntryId,
741 last_confirmed_ledger_length: LedgerLength,
742 mut request_receiver: mpsc::UnboundedReceiver<WriteRequest>,
743 metadata_sender: mpsc::Sender<Versioned<LedgerMetadata>>,
744 ) {
745 let ensemble_size = metadata.value.ensemble_size as usize;
746 let write_quorum_size = metadata.value.write_quorum_size as usize;
747 let last_ensemble = metadata.last_ensemble().clone();
748 let ledger_state = metadata.value.state;
749 let mut state = WriterState {
750 writer: self,
751 state: ledger_state,
752 recovery: ledger_state == LedgerState::InRecovery,
753 recovery_ensembles: Vec::new(),
754
755 fatal: None,
756 closing: false,
757 termination: Termination::None,
758 close_future: Fuse::terminated(),
759 close_waiters: Vec::with_capacity(5),
760 ledger_closer: |metadata, last_add_confirmed, ledger_length, recovery, ensembles| {
761 self.close_ledger(metadata, last_add_confirmed, ledger_length, recovery, ensembles)
762 },
763
764 metadata: LocalRc::new(metadata),
765 write_ensemble: last_ensemble,
766 changed_bookies: vec![false; ensemble_size].into_boxed_slice(),
767 write_quorum_size,
768
769 adding_entries: VecDeque::new(),
770 confirmed_entries: VecDeque::new(),
771 released_futures: Vec::new(),
772
773 last_adding_entry_id: last_confirmed_entry_id,
774 last_add_entry_id: last_confirmed_entry_id,
775 last_add_confirmed: last_confirmed_entry_id,
776 last_add_ledger_length: last_confirmed_ledger_length,
777 ledger_length: last_confirmed_ledger_length,
778
779 force_future: Fuse::terminated(),
780 force_entry_id: EntryId::INVALID,
781 pending_forces: VecDeque::new(),
782 ledger_forcer: |ensemble, last_add_entry_id| self.force_ledger(ensemble, last_add_entry_id),
783
784 failed_bookies: vec![false; ensemble_size],
785 has_failed_bookies: false,
786 ensemble_changing: Fuse::terminated(),
787
788 lac_future: Fuse::terminated(),
789 lac_timeout: Fuse::terminated(),
790 lac_flushed: last_confirmed_entry_id,
791 lac_flusher: |ensemble, last_add_confirmed| self.write_lac(ensemble, last_add_confirmed),
792 lac_duration: Duration::from_secs(1),
793
794 entry_writer: |bookie_id, entry_id, payload, options| self.add_entry(bookie_id, entry_id, payload, options),
795 ensemble_changer: |metadata: Versioned<LedgerMetadata>, last_add_confirmed, failed_bookies| {
796 self.change_ensemble(metadata.version, metadata.value, last_add_confirmed + 1, failed_bookies)
797 },
798 };
799
800 let mut channel_closed = false;
801 let mut metadata_sending = Fuse::terminated();
802 while !state.terminated() {
803 select! {
804 request = request_receiver.recv(), if !channel_closed => {
805 let Some(request) = request else {
806 channel_closed = true;
807 state.terminate();
808 continue;
809 };
810 match request {
811 WriteRequest::Append{entry_id, payload, responser} => {
812 state.append_entry(entry_id, payload, responser);
813 },
814 WriteRequest::Force{entry_id, responser} => {
815 state.force_ledger(entry_id, responser);
816 },
817 WriteRequest::Close{responser} => {
818 state.close_ledger(responser);
819 },
820 }
821 },
822 new_metadata = state.run_once() => {
823 if let Some(metadata) = new_metadata {
824 metadata_sending = metadata_sender.send(metadata).fuse();
825 }
826 },
827 _ = unsafe {Pin::new_unchecked(&mut metadata_sending)} => {},
828 }
829 }
830 if let Some(err) = state.termination.err() {
831 log::warn!("ledger {} all writers dropped, but ledger close failed: {:?}", self.ledger_id, err);
832 return;
833 }
834 request_receiver.close();
835 while let Some(request) = request_receiver.recv().await {
836 match request {
838 WriteRequest::Append { responser, .. } => {
839 responser.send(Err(BkError::new(ErrorKind::LedgerClosed))).ignore()
840 },
841 WriteRequest::Force { responser, .. } => {
842 responser.send(Err(BkError::new(ErrorKind::LedgerClosed))).ignore()
843 },
844 WriteRequest::Close { responser } => {
845 responser.send(Err(BkError::new(ErrorKind::LedgerClosed))).ignore()
846 },
847 }
848 }
849 }
850
851 fn select_ensemble(&self, metadata: &LedgerMetadata, failed_bookies: &[bool]) -> Result<Vec<BookieId>> {
852 let ensemble = metadata.last_ensemble();
853 let ensemble_size = ensemble.bookies.len();
854 let mut preferred_bookies = Vec::with_capacity(ensemble_size);
855 let mut excluded_bookies = HashSet::with_capacity(ensemble_size);
856 for (i, bookie_id) in ensemble.bookies.iter().enumerate() {
857 if failed_bookies[i] {
858 excluded_bookies.insert(bookie_id);
859 } else {
860 preferred_bookies.push(bookie_id);
861 }
862 }
863 if excluded_bookies.is_empty() {
864 return Ok(ensemble.bookies.to_vec());
865 }
866 let options = EnsembleOptions {
867 ensemble_size: metadata.ensemble_size,
868 write_quorum: metadata.write_quorum_size,
869 ack_quorum: metadata.ack_quorum_size,
870 custom_metadata: &metadata.custom_metadata,
871 preferred_bookies: &preferred_bookies,
872 excluded_bookies: excluded_bookies.clone(),
873 };
874 let mut selected_bookies = self.placement_policy.select_ensemble(&options)?;
875 let mut ordered_bookies = Vec::with_capacity(ensemble_size);
876 for bookie_id in preferred_bookies.into_iter() {
877 if let Some(j) = selected_bookies.iter().position(|id| id == bookie_id) {
878 ordered_bookies.push(selected_bookies.swap_remove(j));
879 }
880 }
881 for (i, bookie_id) in ensemble.bookies.iter().enumerate() {
882 if !failed_bookies[i] && i < ordered_bookies.len() && *bookie_id == ordered_bookies[i] {
883 continue;
884 }
885 ordered_bookies.insert(i, selected_bookies.swap_remove(0));
886 }
887 Ok(ordered_bookies)
888 }
889
890 async fn change_ensemble(
891 &self,
892 mut version: MetaVersion,
893 mut metadata: LedgerMetadata,
894 first_entry_id: EntryId,
895 failed_bookies: Vec<bool>,
896 ) -> Result<Versioned<LedgerMetadata>> {
897 let mut failed_ensemble_container = None;
898 let original_bookies =
899 unsafe { std::mem::transmute::<&[BookieId], &'_ [BookieId]>(metadata.last_ensemble().bookies.as_slice()) };
900 let ensemble_size = metadata.ensemble_size as usize;
901 let mut excluded_bookies = HashSet::with_capacity(ensemble_size);
902 for (i, bookie_id) in original_bookies.iter().enumerate() {
903 if failed_bookies[i] {
904 excluded_bookies.insert(bookie_id);
905 }
906 }
907 let mut preferred_bookies = Vec::with_capacity(ensemble_size);
908 let mut ordered_bookies = Vec::with_capacity(ensemble_size);
909 loop {
910 if excluded_bookies.is_empty() {
911 return Ok(Versioned::new(version, metadata));
912 }
913 let ensemble = metadata.last_ensemble();
914 if first_entry_id < ensemble.first_entry_id {
915 let err = BkError::with_description(ErrorKind::UnexpectedError, &"ledger ensemble changed");
916 return Err(err);
917 }
918 let preferred_bookies =
919 unsafe { std::mem::transmute::<&mut Vec<&BookieId>, &'_ mut Vec<&BookieId>>(&mut preferred_bookies) };
920 preferred_bookies.extend(ensemble.bookies.iter().filter(|bookie_id| !excluded_bookies.contains(bookie_id)));
921 let options = EnsembleOptions {
922 ensemble_size: metadata.ensemble_size,
923 write_quorum: metadata.write_quorum_size,
924 ack_quorum: metadata.ack_quorum_size,
925 custom_metadata: &metadata.custom_metadata,
926 preferred_bookies: preferred_bookies.as_slice(),
927 excluded_bookies: excluded_bookies.clone(),
928 };
929 let mut selected_bookies = self.placement_policy.select_ensemble(&options)?;
930 for bookie_id in preferred_bookies.iter() {
931 if let Some(i) = selected_bookies.iter().position(|id| id == *bookie_id) {
932 ordered_bookies.push(selected_bookies.swap_remove(i));
933 }
934 }
935 preferred_bookies.clear();
936 let mut i = 0;
937 while i < ordered_bookies.len() {
938 if ordered_bookies[i] != ensemble.bookies[i] {
939 ordered_bookies.insert(i, selected_bookies.pop().unwrap());
940 }
941 i += 1;
942 }
943 ordered_bookies.append(&mut selected_bookies);
944
945 if first_entry_id == ensemble.first_entry_id {
946 let replaced_ensemble = metadata.ensembles.pop();
947 failed_ensemble_container = failed_ensemble_container.or(replaced_ensemble);
948 }
949 metadata.ensembles.push(LedgerEnsemble { first_entry_id, bookies: ordered_bookies });
950 match self.meta_store.write_ledger_metadata(&metadata, version).await? {
951 either::Right(new_version) => return Ok(Versioned::new(new_version, metadata)),
952 either::Left(Versioned { version: conflicting_version, value: conflicting_metadata }) => {
953 ordered_bookies = metadata.ensembles.pop().unwrap().bookies;
954 ordered_bookies.clear();
955 version = conflicting_version;
956 metadata = conflicting_metadata;
957 },
958 }
959 if metadata.state != LedgerState::Open {
960 return Ok(Versioned::new(version, metadata));
961 }
962 }
963 }
964
965 async fn close_ledger(
966 &self,
967 metadata: Versioned<LedgerMetadata>,
968 last_add_confirmed: EntryId,
969 ledger_length: LedgerLength,
970 recovery: bool,
971 ensembles: Vec<LedgerEnsemble>,
972 ) -> Result<Versioned<LedgerMetadata>> {
973 let Versioned { mut version, value: mut metadata } = metadata;
974 loop {
975 if recovery {
976 if metadata.state != LedgerState::InRecovery {
977 let err = BkError::with_description(ErrorKind::LedgerConcurrentClose, &"ledger not in recovery");
978 return Err(err);
979 }
980 } else if metadata.state == LedgerState::InRecovery {
981 let err = BkError::with_description(ErrorKind::LedgerConcurrentClose, &"ledger in recovery");
982 return Err(err);
983 } else if metadata.state == LedgerState::Closed {
984 if metadata.last_entry_id == last_add_confirmed && metadata.length == ledger_length {
985 return Ok(Versioned::new(version, metadata));
986 }
987 let msg = format!(
988 "attemp to close ledger with (last_entry_id, length) ({}, {}), but got ({}, {})",
989 last_add_confirmed, ledger_length, metadata.last_entry_id, metadata.length
990 );
991 let err = BkError::with_message(ErrorKind::LedgerConcurrentClose, msg);
992 return Err(err);
993 }
994 if !ensembles.is_empty() {
995 let last_ensemble_entry_id = metadata.last_ensemble().first_entry_id;
996 match last_ensemble_entry_id.cmp(&ensembles[0].first_entry_id) {
997 Ordering::Greater => {
998 let err = BkError::with_description(
999 ErrorKind::UnexpectedError,
1000 &"ledger ensembles changed in recovery mode",
1001 );
1002 return Err(err);
1003 },
1004 Ordering::Less => {
1005 metadata.ensembles.extend_from_slice(&ensembles);
1006 },
1007 Ordering::Equal => {
1008 metadata.ensembles.pop();
1009 },
1010 }
1011 }
1012 metadata.state = LedgerState::Closed;
1013 metadata.last_entry_id = last_add_confirmed;
1014 metadata.length = ledger_length;
1015 let r = self.meta_store.write_ledger_metadata(&metadata, version).await?;
1016 match r {
1017 either::Right(version) => return Ok(Versioned::new(version, metadata)),
1018 either::Left(Versioned { version: conflicting_version, value: conflicting_metadata }) => {
1019 version = conflicting_version;
1020 metadata = conflicting_metadata;
1021 },
1022 }
1023 }
1024 }
1025
1026 async fn write_lac(&self, bookies: Vec<BookieId>, lac: EntryId) -> EntryId {
1027 let options =
1028 bookie::WriteLacOptions { master_key: &self.master_key, digest_algorithm: &self.digest_algorithm };
1029 let n = bookies.len();
1030 let mut futures = Vec::with_capacity(n);
1031 let write_set = self.new_write_set(lac);
1032 for bookie_index in write_set.iter() {
1033 let bookie_id = &bookies[bookie_index];
1034 futures.push(self.client.write_lac(bookie_id, self.ledger_id, lac, &options).fuse());
1035 }
1036 let mut ack_set = self.new_ack_set();
1037 let mut failed = 0;
1038 let mut select_all = SelectAll::new(&mut futures);
1039 let mut err = None;
1040 while failed <= self.entry_distribution.failure_threshold() {
1041 let (write_index, write_result) = select_all.next().await;
1042 if let Err(e) = write_result {
1043 err = err.or(Some(e));
1044 failed += 1;
1045 } else if ack_set.complete_write(write_index) {
1046 return lac;
1047 }
1048 }
1049 lac
1050 }
1051
1052 async fn force_ledger(&self, bookies: Vec<BookieId>, last_add_entry_id: EntryId) -> Result<EntryId> {
1053 let mut futures = Vec::with_capacity(bookies.len());
1054 for bookie_id in bookies.iter() {
1055 futures.push(self.client.force_ledger(bookie_id, self.ledger_id).fuse());
1056 }
1057 let mut select_all = SelectAll::new(&mut futures);
1058 for _ in 0..bookies.len() {
1059 select_all.next().await.1?;
1060 }
1061 Ok(last_add_entry_id)
1062 }
1063
1064 async fn add_entry(
1065 &self,
1066 bookie_id: BookieId,
1067 entry_id: EntryId,
1068 payload: &'static [u8],
1069 options: AddOptions,
1070 ) -> Result<()> {
1071 let adding_entry = bookie::AddingEntry {
1072 ledger_id: self.ledger_id,
1073 entry_id,
1074 last_add_confirmed: options.last_add_confirmed,
1075 accumulated_ledger_length: options.ledger_length,
1076 payload,
1077 };
1078 let add_options = bookie::AddOptions {
1079 recovery_add: options.recovery_add,
1080 high_priority: options.high_priority,
1081 deferred_sync: self.deferred_sync,
1082 master_key: &self.master_key,
1083 digest_algorithm: &self.digest_algorithm,
1084 };
1085 self.client.add_entry(&bookie_id, &adding_entry, &add_options).await?;
1086 Ok(())
1087 }
1088}
1089
1090#[derive(Clone)]
1092pub struct LedgerAppender {
1093 pub(crate) reader: LedgerReader,
1094 pub(crate) last_add_entry_id: Arc<AtomicEntryId>,
1095 pub(crate) request_sender: mpsc::UnboundedSender<WriteRequest>,
1096}
1097
1098assert_impl_all!(LedgerAppender: Send, Sync);
1099
1100#[derive(Debug)]
1101pub struct AddedEntry {
1102 entry_id: EntryId,
1103 last_add_confirmed: EntryId,
1104}
1105
1106#[derive(Debug)]
1107pub enum WriteRequest {
1108 Append { entry_id: EntryId, payload: Vec<u8>, responser: oneshot::Sender<Result<AddedEntry>> },
1109 Force { entry_id: EntryId, responser: oneshot::Sender<Result<EntryId>> },
1110 Close { responser: oneshot::Sender<Result<Versioned<LedgerMetadata>>> },
1111}
1112
1113#[derive(Debug)]
1114struct PendingForce {
1115 last_add_entry_id: EntryId,
1116 responser: oneshot::Sender<Result<EntryId>>,
1117}
1118
1119impl LedgerAppender {
1120 pub fn id(&self) -> LedgerId {
1121 self.reader.ledger_id
1122 }
1123
1124 pub async fn close(&mut self, _options: CloseOptions) -> Result<()> {
1126 let (sender, receiver) = oneshot::channel();
1127 let request = WriteRequest::Close { responser: sender };
1128 if self.request_sender.send(request).is_err() {
1129 return Err(BkError::new(ErrorKind::LedgerClosed));
1130 }
1131 let metadata = receiver.await.unwrap()?;
1132 self.reader.update_metadata(metadata);
1133 Ok(())
1134 }
1135
1136 pub fn reader(&self) -> Result<LedgerReader> {
1138 Ok(self.reader.clone())
1139 }
1140
1141 fn last_add_entry_id(&self) -> EntryId {
1142 self.last_add_entry_id.get()
1143 }
1144
1145 pub fn last_add_confirmed(&self) -> EntryId {
1147 self.reader.last_add_confirmed()
1148 }
1149
1150 fn update_last_add_entry_id(&self, entry_id: EntryId) -> EntryId {
1151 self.last_add_entry_id.update(entry_id)
1152 }
1153
1154 fn update_last_add_confirmed(&self, last_add_confirmed: EntryId) -> EntryId {
1155 self.reader.metadata.update_lac(last_add_confirmed)
1156 }
1157
1158 pub async fn force(&self) -> Result<()> {
1163 let last_add_entry_id = self.last_add_entry_id();
1164 if last_add_entry_id <= self.last_add_confirmed() {
1165 return Ok(());
1166 }
1167 let (sender, receiver) = oneshot::channel();
1168 let request = WriteRequest::Force { entry_id: last_add_entry_id, responser: sender };
1169 if self.request_sender.send(request).is_err() {
1170 return Err(BkError::new(ErrorKind::LedgerClosed));
1171 }
1172 let last_add_confirmed = receiver.await.unwrap()?;
1173 self.update_last_add_confirmed(last_add_confirmed);
1174 Ok(())
1175 }
1176
1177 async fn wait<T, E, F>(result: std::result::Result<F, E>) -> std::result::Result<T, E>
1178 where
1179 F: Future<Output = std::result::Result<T, E>>, {
1180 match result {
1181 Err(err) => Err(err),
1182 Ok(future) => future.await,
1183 }
1184 }
1185
1186 pub fn append<'a>(&'a self, data: &'_ [u8]) -> impl Future<Output = Result<EntryId>> + Send + 'a {
1188 Self::wait(self.append_internally(data))
1189 }
1190
1191 fn append_internally<'a>(&'a self, data: &[u8]) -> Result<impl Future<Output = Result<EntryId>> + Send + 'a> {
1192 let (sender, receiver) = oneshot::channel();
1193 let request = WriteRequest::Append { entry_id: EntryId::INVALID, payload: data.to_vec(), responser: sender };
1194 if self.request_sender.send(request).is_err() {
1195 return Err(BkError::new(ErrorKind::LedgerClosed));
1196 }
1197 Ok(async move {
1198 let AddedEntry { entry_id, last_add_confirmed } = receiver.await.unwrap()?;
1199 self.update_last_add_entry_id(entry_id);
1200 self.update_last_add_confirmed(last_add_confirmed);
1201 Ok(entry_id)
1202 })
1203 }
1204}