1use std::collections::{HashMap, VecDeque};
2use std::convert::Into;
3use std::io;
4use std::mem::MaybeUninit;
5use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex, RwLock};
7use std::time::Duration;
8
9use bytes::buf::{Buf, BufMut};
10use futures::future::FutureExt;
11use ignore_result::Ignore;
12use prost::Message;
13use tokio::net::{TcpStream, ToSocketAddrs};
14use tokio::select;
15use tokio::sync::mpsc;
16use tokio::sync::oneshot::{self, Sender};
17
18use super::digest::traits::{Algorithm as _, Digester as _};
19use super::digest::{Algorithm as DigestAlgorithm, Digester};
20use super::errors::{BkError, ErrorKind};
21use super::metadata::{BookieId, EntryId, LedgerId, LedgerLength};
22use crate::future::SelectIterable;
23use crate::meta::util::{BookieRegistry, BookieRegistrySnapshot};
24use crate::proto::*;
25
26pub type Result<T> = std::result::Result<T, BkError>;
27
28static LAST_ADD_CONFIRMED: EntryId = EntryId::INVALID;
29
30const ENTRY_METADATA_LENGTH: usize = 32;
32
33const LAC_METADATA_LENGTH: usize = 16;
35
36const DEFERRED_WRITE: i32 = 1;
37const INVALID_ENTRY_ID: i64 = -1;
38
39const MAX_DIGEST_LENGTH: usize = 20;
40
41#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
42struct TxnId(u64);
43
44impl From<u64> for TxnId {
45 fn from(txn_id: u64) -> Self {
46 TxnId(txn_id)
47 }
48}
49
50impl From<TxnId> for u64 {
51 fn from(txn_id: TxnId) -> u64 {
52 txn_id.0
53 }
54}
55
56pub struct PolledEntry {
57 pub last_add_confirmed: EntryId,
58 pub payload: Option<Vec<u8>>,
60}
61
62fn response_status_to_error(status: i32) -> BkError {
63 if status == StatusCode::Enoledger as i32 {
64 BkError::new(ErrorKind::LedgerNotExisted)
65 } else if status == StatusCode::Enoentry as i32 {
66 BkError::new(ErrorKind::EntryNotExisted)
67 } else if status == StatusCode::Ebadreq as i32 {
68 BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"bad request")
69 } else if status == StatusCode::Eio as i32 {
70 BkError::new(ErrorKind::BookieIoError)
71 } else if status == StatusCode::Eua as i32 {
72 BkError::new(ErrorKind::UnauthorizedAccess)
73 } else if status == StatusCode::Ebadversion as i32 {
74 BkError::new(ErrorKind::BookieBadVersion)
75 } else if status == StatusCode::Efenced as i32 {
76 BkError::new(ErrorKind::LedgerFenced)
77 } else if status == StatusCode::Ereadonly as i32 {
78 BkError::new(ErrorKind::BookieReadOnly)
79 } else if status == StatusCode::Etoomanyrequests as i32 {
80 BkError::new(ErrorKind::BookieTooManyRequests)
81 } else {
82 BkError::with_message(ErrorKind::BookieUnexpectedResponse, format!("unknown status code {}", status))
83 }
84}
85
86trait Operation {
87 type Response;
88
89 fn type_of() -> OperationType;
90
91 fn request_of(self, txn_id: TxnId) -> Request;
92
93 fn extract_response(response: &mut Response) -> Option<Self::Response>;
94
95 fn extract_status(_: &Self::Response) -> i32 {
96 StatusCode::Eok as i32
97 }
98
99 fn head_of(txn_id: TxnId) -> BkPacketHeader {
100 BkPacketHeader {
101 version: ProtocolVersion::VersionThree as i32,
102 operation: Self::type_of() as i32,
103 txn_id: txn_id.into(),
104 ..Default::default()
105 }
106 }
107
108 fn response_of(mut response: Response) -> Result<Self::Response> {
109 let op_type = response.header.operation;
110 if op_type != Self::type_of() as i32 {
111 let msg = format!("expect response type {:?}, got {:?}", Self::type_of(), op_type);
112 let err = BkError::with_message(ErrorKind::BookieUnexpectedResponse, msg);
113 return Err(err);
114 }
115 if response.status != StatusCode::Eok as i32 {
116 let err = response_status_to_error(response.status);
117 return Err(err);
118 }
119 let Some(inner_response) = Self::extract_response(&mut response) else {
120 let err = BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"no response");
121 return Err(err);
122 };
123 let status = Self::extract_status(&inner_response);
124 if status != StatusCode::Eok as i32 {
125 let err = response_status_to_error(response.status);
126 return Err(err);
127 }
128 Ok(inner_response)
129 }
130}
131
132impl Operation for ReadRequest {
133 type Response = ReadResponse;
134
135 fn type_of() -> OperationType {
136 OperationType::ReadEntry
137 }
138
139 fn request_of(self, txn_id: TxnId) -> Request {
140 Request { header: Self::head_of(txn_id), read_request: Some(self), ..Default::default() }
141 }
142
143 fn extract_response(response: &mut Response) -> Option<Self::Response> {
144 response.read_response.take()
145 }
146
147 fn extract_status(response: &Self::Response) -> i32 {
148 response.status
149 }
150}
151
152impl Operation for AddRequest {
153 type Response = AddResponse;
154
155 fn type_of() -> OperationType {
156 OperationType::AddEntry
157 }
158
159 fn request_of(self, txn_id: TxnId) -> Request {
160 Request { header: Self::head_of(txn_id), add_request: Some(self), ..Default::default() }
161 }
162
163 fn extract_response(response: &mut Response) -> Option<Self::Response> {
164 response.add_response.take()
165 }
166
167 fn extract_status(response: &Self::Response) -> i32 {
168 response.status
169 }
170}
171
172impl Operation for WriteLacRequest {
173 type Response = WriteLacResponse;
174
175 fn type_of() -> OperationType {
176 OperationType::WriteLac
177 }
178
179 fn request_of(self, txn_id: TxnId) -> Request {
180 Request { header: Self::head_of(txn_id), write_lac_request: Some(self), ..Default::default() }
181 }
182
183 fn extract_response(response: &mut Response) -> Option<Self::Response> {
184 response.write_lac_response.take()
185 }
186
187 fn extract_status(response: &Self::Response) -> i32 {
188 response.status
189 }
190}
191
192impl Operation for ReadLacRequest {
193 type Response = ReadLacResponse;
194
195 fn type_of() -> OperationType {
196 OperationType::ReadLac
197 }
198
199 fn request_of(self, txn_id: TxnId) -> Request {
200 Request { header: Self::head_of(txn_id), read_lac_request: Some(self), ..Default::default() }
201 }
202
203 fn extract_response(response: &mut Response) -> Option<Self::Response> {
204 response.read_lac_response.take()
205 }
206
207 fn extract_status(response: &Self::Response) -> i32 {
208 response.status
209 }
210}
211
212impl Operation for StartTlsRequest {
213 type Response = StartTlsResponse;
214
215 fn type_of() -> OperationType {
216 OperationType::StartTls
217 }
218
219 fn request_of(self, txn_id: TxnId) -> Request {
220 Request { header: Self::head_of(txn_id), start_tls_request: Some(self), ..Default::default() }
221 }
222
223 fn extract_response(response: &mut Response) -> Option<Self::Response> {
224 response.start_tls_response.take()
225 }
226}
227
228impl Operation for ForceLedgerRequest {
229 type Response = ForceLedgerResponse;
230
231 fn type_of() -> OperationType {
232 OperationType::ForceLedger
233 }
234
235 fn request_of(self, txn_id: TxnId) -> Request {
236 Request { header: Self::head_of(txn_id), force_ledger_request: Some(self), ..Default::default() }
237 }
238
239 fn extract_response(response: &mut Response) -> Option<Self::Response> {
240 response.force_ledger_response.take()
241 }
242
243 fn extract_status(response: &Self::Response) -> i32 {
244 response.status
245 }
246}
247
248impl Operation for GetBookieInfoRequest {
249 type Response = GetBookieInfoResponse;
250
251 fn type_of() -> OperationType {
252 OperationType::GetBookieInfo
253 }
254
255 fn request_of(self, txn_id: TxnId) -> Request {
256 Request { header: Self::head_of(txn_id), get_bookie_info_request: Some(self), ..Default::default() }
257 }
258
259 fn extract_response(response: &mut Response) -> Option<Self::Response> {
260 response.get_bookie_info_response.take()
261 }
262
263 fn extract_status(response: &Self::Response) -> i32 {
264 response.status
265 }
266}
267
268impl Operation for GetListOfEntriesOfLedgerRequest {
269 type Response = GetListOfEntriesOfLedgerResponse;
270
271 fn type_of() -> OperationType {
272 OperationType::GetListOfEntriesOfLedger
273 }
274
275 fn request_of(self, txn_id: TxnId) -> Request {
276 Request {
277 header: Self::head_of(txn_id),
278 get_list_of_entries_of_ledger_request: Some(self),
279 ..Default::default()
280 }
281 }
282
283 fn extract_response(response: &mut Response) -> Option<Self::Response> {
284 response.get_list_of_entries_of_ledger_response.take()
285 }
286
287 fn extract_status(response: &Self::Response) -> i32 {
288 response.status
289 }
290}
291
292pub struct AddingEntry<'a> {
293 pub ledger_id: LedgerId,
294 pub entry_id: EntryId,
295 pub last_add_confirmed: EntryId,
296 pub accumulated_ledger_length: LedgerLength,
297 pub payload: &'a [u8],
298}
299
300pub struct AddOptions<'a> {
301 pub recovery_add: bool,
302 pub high_priority: bool,
303 pub deferred_sync: bool,
304 pub master_key: &'a [u8],
305 pub digest_algorithm: &'a DigestAlgorithm,
306}
307
308pub struct FetchedEntry {
309 pub max_lac: EntryId,
310 pub last_add_confirmed: EntryId,
311 pub ledger_length: LedgerLength,
312 pub payload: Vec<u8>,
313}
314
315pub struct ReadOptions<'a> {
316 pub fence_ledger: bool,
317 pub high_priority: bool,
318 pub digest_algorithm: &'a DigestAlgorithm,
319 pub master_key: Option<&'a [u8]>,
320}
321
322pub struct PollOptions<'a> {
323 pub timeout: Duration,
324 pub digest_algorithm: &'a DigestAlgorithm,
325}
326
327pub struct WriteLacOptions<'a> {
328 pub master_key: &'a [u8],
329 pub digest_algorithm: &'a DigestAlgorithm,
330}
331
332struct EntryMetadata {
333 entry_id: EntryId,
334 last_add_confirmed: EntryId,
335 ledger_length: LedgerLength,
336}
337
338struct EntryContent {
339 entry_id: EntryId,
340 last_add_confirmed: EntryId,
341 ledger_length: LedgerLength,
342 payload: Vec<u8>,
343}
344
345struct EntryBody(Vec<u8>);
346
347impl EntryBody {
348 fn corrupted(&mut self) -> BkError {
349 BkError::with_description(ErrorKind::EntryInvalidData, &"corrupted data")
350 }
351
352 fn extract_explicit_lac(&mut self, ledger_id: LedgerId, mut digester: Digester) -> Result<EntryId> {
353 let digest_len = digester.digest_length();
354 let prefix_len = LAC_METADATA_LENGTH + digest_len;
355 let body = &self.0;
356 if body.len() < prefix_len {
357 return Err(self.corrupted());
358 }
359 let mut metadata = &body[..LAC_METADATA_LENGTH];
360 digester.update(metadata);
361 #[allow(invalid_value)]
362 #[allow(clippy::uninit_assumed_init)]
363 let mut buf: [u8; MAX_DIGEST_LENGTH] = unsafe { MaybeUninit::uninit().assume_init() };
364 digester.digest(&mut &mut buf[..digest_len]);
365 if buf[..digest_len] != body[LAC_METADATA_LENGTH..prefix_len] {
366 return Err(self.corrupted());
367 }
368 let actual_ledger_id = metadata.get_i64();
369 if ledger_id != actual_ledger_id {
370 let msg = format!("expect ledger id {} in reading lac, got {}", ledger_id, actual_ledger_id);
371 let err = BkError::with_message(ErrorKind::BookieUnexpectedResponse, msg);
372 return Err(err);
373 }
374 let lac = metadata.get_i64();
375 Ok(EntryId(lac))
376 }
377
378 fn verify_entry(
379 &mut self,
380 ledger_id: LedgerId,
381 entry_id: EntryId,
382 mut digester: Digester,
383 ) -> Result<EntryMetadata> {
384 let digest_len = digester.digest_length();
385 let prefix_len = ENTRY_METADATA_LENGTH + digest_len;
386 let body = &self.0;
387 if body.len() < prefix_len {
388 return Err(self.corrupted());
389 }
390 let mut metadata = &body[..ENTRY_METADATA_LENGTH];
391 digester.update(metadata);
392 digester.update(&body[prefix_len..]);
393 #[allow(invalid_value)]
394 #[allow(clippy::uninit_assumed_init)]
395 let mut buf: [u8; MAX_DIGEST_LENGTH] = unsafe { MaybeUninit::uninit().assume_init() };
396 digester.digest(&mut &mut buf[..digest_len]);
397 if buf[..digest_len] != body[ENTRY_METADATA_LENGTH..prefix_len] {
398 return Err(self.corrupted());
399 }
400 let actual_ledger_id = metadata.get_i64();
401 let actual_entry_id = metadata.get_i64();
402 let last_add_confirmed = metadata.get_i64();
403 let accumulated_payload_len = metadata.get_i64();
404 let payload_len = body.len() - prefix_len;
405 if ledger_id != actual_ledger_id {
406 let msg = format!("expect ledger id {} in reading lac, got {}", ledger_id, actual_ledger_id);
407 let err = BkError::with_message(ErrorKind::BookieUnexpectedResponse, msg);
408 return Err(err);
409 }
410 if entry_id != EntryId::INVALID && entry_id != actual_entry_id {
411 let msg = format!("expect entry id {:?}, got {}", entry_id, actual_entry_id);
412 let err = BkError::with_message(ErrorKind::BookieUnexpectedResponse, msg);
413 return Err(err);
414 }
415 if last_add_confirmed >= actual_entry_id {
416 return Err(self.corrupted());
417 }
418 if accumulated_payload_len < payload_len as i64 {
419 let msg =
420 format!("accumulated ledger length {} < entry payload length {}", accumulated_payload_len, payload_len);
421 let err = BkError::with_message(ErrorKind::BookieUnexpectedResponse, msg);
422 return Err(err);
423 }
424 Ok(EntryMetadata {
425 entry_id: EntryId(actual_entry_id),
426 last_add_confirmed: EntryId(last_add_confirmed),
427 ledger_length: accumulated_payload_len.into(),
428 })
429 }
430
431 fn extract_piggyback_lac(&mut self, ledger_id: LedgerId, entry_id: EntryId, digester: Digester) -> Result<EntryId> {
432 let EntryMetadata { last_add_confirmed, .. } = self.verify_entry(ledger_id, entry_id, digester)?;
433 Ok(last_add_confirmed)
434 }
435
436 fn extract_entry(mut self, ledger_id: LedgerId, entry_id: EntryId, digester: Digester) -> Result<EntryContent> {
437 let digest_len = digester.digest_length();
438 let EntryMetadata { entry_id: actual_entry_id, last_add_confirmed, ledger_length } =
439 self.verify_entry(ledger_id, entry_id, digester)?;
440 let mut body = self.0;
441 let prefix_len = ENTRY_METADATA_LENGTH + digest_len;
442 body.drain(..prefix_len);
443 Ok(EntryContent { entry_id: actual_entry_id, last_add_confirmed, ledger_length, payload: body })
444 }
445}
446
447#[derive(Debug)]
448struct MessageRequest {
449 txn_id: TxnId,
450 bytes: Vec<u8>,
451 responser: Sender<Result<Response>>,
452}
453
454async fn io_loop(sock: TcpStream, state: Arc<State>, mut request_receiver: mpsc::Receiver<MessageRequest>) {
455 let mut read_buf = Vec::with_capacity(4096);
456 let mut pending_requests: VecDeque<MessageRequest> = VecDeque::with_capacity(512);
457 let mut pending_txns: HashMap<TxnId, oneshot::Sender<Result<Response>>> = HashMap::with_capacity(20);
458 let mut io_slices: Vec<std::io::IoSlice<'_>> = Vec::with_capacity(20);
459 let mut first_request_offset = 0usize;
460 let err = 'io: loop {
461 select! {
462 _ = sock.readable() => {
463 match sock.try_read_buf(&mut read_buf) {
464 Ok(0) => {
465 let err = BkError::with_description(ErrorKind::BookieNotAvailable, &"bookie closed");
466 break err;
467 },
468 Ok(_) => {
469 while read_buf.len() > 4 {
470 let mut len_buf = &read_buf[..4];
471 let msg_len = len_buf.get_u32() as usize + 4;
472 if read_buf.len() < msg_len {
473 continue;
474 }
475 let response_buf = &read_buf[4..msg_len];
476 let response = match Response::decode(response_buf) {
477 Err(e) => break 'io BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"decode failure").cause_by(e),
478 Ok(response) => response,
479 };
480 drop(read_buf.drain(..msg_len));
481 let txn_id = TxnId(response.header.txn_id);
482 if let Some(responser) = pending_txns.remove(&txn_id) {
483 responser.send(Ok(response)).ignore();
484 }
485 }
486 },
487 Err(e) => {
488 if e.kind() == io::ErrorKind::WouldBlock {
489 continue;
490 }
491 let err = BkError::with_description(ErrorKind::BookieNotAvailable, &"bookie read failed").cause_by(e);
492 break err;
493 },
494 }
495 },
496 r = request_receiver.recv(), if pending_requests.len() < 512 => {
497 match r {
498 Some(request) => {
499 pending_requests.push_back(request);
500 },
501 None => return,
502 };
503 },
504 _ = sock.writable(), if !pending_requests.is_empty() => {
505 io_slices.clear();
506 io_slices.push(std::io::IoSlice::new(unsafe {&*(&pending_requests[0].bytes[first_request_offset..] as *const [u8])}));
507 pending_requests.iter().skip(1).for_each(|r| {
508 io_slices.push(std::io::IoSlice::new(unsafe {&*(&r.bytes[first_request_offset..] as *const [u8])}));
509 });
510 match sock.try_write_vectored(&io_slices) {
511 Ok(mut n) => {
512 if n < pending_requests[0].bytes.len() - first_request_offset {
513 first_request_offset += n;
514 continue;
515 }
516 n -= pending_requests[0].bytes.len() - first_request_offset;
517 let MessageRequest{txn_id, responser, bytes} = pending_requests.pop_front().unwrap();
518 pending_txns.insert(txn_id, responser);
519 state.release_buf(bytes);
520 first_request_offset = 0;
521 while let Some(request) = pending_requests.front() {
522 if n < request.bytes.len() {
523 first_request_offset = n;
524 break;
525 }
526 let MessageRequest{txn_id, responser, bytes} = pending_requests.pop_front().unwrap();
527 n -= bytes.len();
528 pending_txns.insert(txn_id, responser);
529 state.release_buf(bytes);
530 }
531 },
532 Err(e) => {
533 if e.kind() == io::ErrorKind::WouldBlock {
534 continue;
535 }
536 let err = BkError::with_description(ErrorKind::BookieNotAvailable, &"bookie write failed").cause_by(e);
537 break err;
538 }
539 }
540 },
541 }
542 };
543 unsafe { state.set_err(err.clone()) };
546 request_receiver.close();
547 pending_requests.into_iter().for_each(|MessageRequest { responser, .. }| responser.send(Err(err.clone())).ignore());
548 loop {
549 let MessageRequest { responser, .. } = match request_receiver.recv().await {
550 None => break,
551 Some(request) => request,
552 };
553 responser.send(Err(err.clone())).ignore();
554 }
555}
556
557struct State {
558 txn_id_counter: AtomicU64,
559 err: AtomicPtr<BkError>,
560 buffers: Mutex<Vec<Vec<u8>>>,
561}
562
563impl State {
564 fn new() -> State {
565 State {
566 txn_id_counter: AtomicU64::new(1),
567 err: AtomicPtr::new(std::ptr::null_mut()),
568 buffers: Mutex::new(Vec::with_capacity(512)),
569 }
570 }
571
572 fn next_txn_id(&self) -> TxnId {
573 let txn_id = self.txn_id_counter.fetch_add(1, Ordering::Relaxed);
574 TxnId::from(txn_id)
575 }
576
577 fn release_buf(&self, mut buf: Vec<u8>) {
578 buf.clear();
579 let mut buffers = self.buffers.lock().unwrap();
580 buffers.push(buf);
581 }
582
583 fn get_buf(&self) -> Vec<u8> {
584 let mut buffers = self.buffers.lock().unwrap();
585 let buf = buffers.pop();
586 drop(buffers);
587 buf.unwrap_or_else(|| Vec::with_capacity(1024))
588 }
589
590 unsafe fn get_err(&self) -> Option<BkError> {
591 let ptr = self.err.load(Ordering::Relaxed);
592 if ptr.is_null() {
593 return None;
594 }
595 let err = &mut *ptr;
596 Some(err.clone())
597 }
598
599 unsafe fn set_err(&self, err: BkError) {
600 let err = Box::into_raw(Box::new(err));
601 self.err.store(err, Ordering::Relaxed);
602 }
603}
604
605impl Drop for State {
606 fn drop(&mut self) {
607 let err = *self.err.get_mut();
608 if err.is_null() {
609 return;
610 }
611 let _ = unsafe { Box::from_raw(err) };
612 }
613}
614
615pub struct Client {
616 state: Arc<State>,
617 requester: mpsc::Sender<MessageRequest>,
618}
619
620pub struct Configuration {}
621
622enum ClientRequest {
623 Create { bookie_id: BookieId, responser: oneshot::Sender<Result<Arc<Client>>> },
624}
625
626impl std::fmt::Debug for ClientRequest {
627 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
628 match self {
629 ClientRequest::Create { bookie_id, .. } => write!(f, "client creation request for {}", bookie_id),
630 }
631 }
632}
633
634async fn connect(registry: BookieRegistrySnapshot, bookie_id: BookieId) -> (BookieId, Result<Client>) {
635 let bookie = match registry.get_service_info(&bookie_id) {
636 None => return (bookie_id, Err(BkError::with_description(ErrorKind::BookieNotAvailable, &"no service info"))),
637 Some(bookie) => bookie,
638 };
639 let endpoint = match bookie.rpc_endpoint() {
640 None => return (bookie_id, Err(BkError::with_description(ErrorKind::BookieNotAvailable, &"no rpc endpoint"))),
641 Some(endpoint) => endpoint,
642 };
643 let client = Client::connect((endpoint.host.as_str(), endpoint.port), &Configuration {}).await;
644 (bookie_id, client)
645}
646
647fn get_client(clients: &RwLock<HashMap<BookieId, Arc<Client>>>, bookie_id: &BookieId) -> Option<Arc<Client>> {
648 let clients = clients.read().unwrap();
649 return clients.get(bookie_id).cloned();
650}
651
652async fn client_request_loop(
653 clients: Arc<RwLock<HashMap<BookieId, Arc<Client>>>>,
654 mut requester: mpsc::UnboundedReceiver<ClientRequest>,
655 registry: BookieRegistry,
656) {
657 let mut pending_requests: HashMap<BookieId, Vec<_>> = HashMap::new();
658 let mut connecting_clients = VecDeque::new();
659 let mut registry_snapshot = registry.snapshot();
660 let mut released_responsers = Vec::new();
661 loop {
662 select! {
663 r = requester.recv() => {
664 let ClientRequest::Create {bookie_id, responser} = match r {
665 None => break,
666 Some(request) => request,
667 };
668 if let Some(client) = get_client(&clients, &bookie_id) {
669 responser.send(Ok(client)).ignore();
670 continue;
671 }
672 let entry = pending_requests.entry(bookie_id.clone());
673 let responsers = entry.or_insert_with(|| released_responsers.pop().unwrap_or_default());
674 if responsers.is_empty() {
675 registry.update(&mut registry_snapshot);
676 connecting_clients.push_back(connect(registry_snapshot.clone(), bookie_id).fuse());
677 }
678 responsers.push(responser);
679 },
680 (i, (bookie_id, r)) = SelectIterable::next(&mut connecting_clients) => {
681 connecting_clients.swap_remove_back(i);
682 let (_, mut responsers) = pending_requests.remove_entry(&bookie_id).unwrap();
683 let result = r.map(Arc::new);
684 responsers.drain(..).for_each(|responser| {
685 responser.send(result.clone()).ignore();
686 });
687 if let Ok(client) = result {
688 clients.write().unwrap().insert(bookie_id, client);
689 }
690 released_responsers.push(responsers);
691 },
692 }
693 }
694}
695
696pub(crate) struct PoolledClient {
697 clients: Arc<RwLock<HashMap<BookieId, Arc<Client>>>>,
698 requester: mpsc::UnboundedSender<ClientRequest>,
699}
700
701impl PoolledClient {
702 pub fn new(bookie_registory: BookieRegistry) -> PoolledClient {
703 let clients = Arc::new(RwLock::new(HashMap::with_capacity(512)));
704 let (sender, receiver) = mpsc::unbounded_channel();
705 tokio::spawn(client_request_loop(clients.clone(), receiver, bookie_registory));
706 PoolledClient { clients, requester: sender }
707 }
708
709 pub fn prepare_ensemble(&self, bookies: &[BookieId]) {
710 let mut has_clients = vec![false; bookies.len()];
711 let clients = self.clients.read().unwrap();
712 for (i, bookie_id) in bookies.iter().enumerate() {
713 has_clients[i] = clients.contains_key(bookie_id);
714 }
715 drop(clients);
716 for (i, has) in has_clients.iter().enumerate() {
717 if !has {
718 let bookie_id = &bookies[i];
719 let (sender, _) = oneshot::channel();
720 self.requester.send(ClientRequest::Create { bookie_id: bookie_id.clone(), responser: sender }).unwrap();
721 }
722 }
723 }
724
725 fn get_client(&self, bookie_id: &BookieId) -> Option<Arc<Client>> {
726 let clients = self.clients.read().unwrap();
727 clients.get(bookie_id.as_str()).cloned()
728 }
729
730 async fn create_or_get_client(&self, bookie_id: &BookieId) -> Result<Arc<Client>> {
731 if let Some(client) = self.get_client(bookie_id) {
732 return Ok(client);
733 }
734 let (sender, receiver) = oneshot::channel();
735 self.requester.send(ClientRequest::Create { bookie_id: bookie_id.to_owned(), responser: sender }).unwrap();
736 receiver.await.unwrap()
737 }
738
739 #[allow(dead_code)]
740 fn remove_client(&self, bookie_id: &BookieId, client: Arc<Client>) {
741 let mut clients = self.clients.write().unwrap();
742 match clients.get(bookie_id) {
743 Some(other) if Arc::ptr_eq(&client, other) => clients.remove(bookie_id),
744 _ => return,
745 };
746 drop(clients);
747 }
748
749 pub async fn read_last_entry(
750 &self,
751 bookie_id: &BookieId,
752 ledger_id: LedgerId,
753 options: &ReadOptions<'_>,
754 ) -> Result<(EntryId, FetchedEntry)> {
755 let client = self.create_or_get_client(bookie_id).await?;
756 client.read_last_entry(ledger_id, options).await
757 }
758
759 pub async fn read_entry(
760 &self,
761 bookie_id: &BookieId,
762 ledger_id: LedgerId,
763 entry_id: EntryId,
764 options: &ReadOptions<'_>,
765 ) -> Result<FetchedEntry> {
766 let client = self.create_or_get_client(bookie_id).await?;
767 client.read_entry(ledger_id, entry_id, options).await
768 }
769
770 pub async fn poll_entry(
771 &self,
772 bookie_id: &BookieId,
773 ledger_id: LedgerId,
774 entry_id: EntryId,
775 options: &PollOptions<'_>,
776 ) -> Result<PolledEntry> {
777 let client = self.create_or_get_client(bookie_id).await?;
778 client.poll_entry(ledger_id, entry_id, options).await
779 }
780
781 pub async fn write_lac(
782 &self,
783 bookie_id: &BookieId,
784 ledger_id: LedgerId,
785 explicit_lac: EntryId,
786 options: &WriteLacOptions<'_>,
787 ) -> Result<()> {
788 let client = self.create_or_get_client(bookie_id).await?;
789 client.write_lac(ledger_id, explicit_lac, options).await
790 }
791
792 pub async fn read_lac(
793 &self,
794 bookie_id: &BookieId,
795 ledger_id: LedgerId,
796 digest_algorithm: &DigestAlgorithm,
797 ) -> Result<EntryId> {
798 let client = self.create_or_get_client(bookie_id).await?;
799 client.read_lac(ledger_id, digest_algorithm).await
800 }
801
802 pub async fn force_ledger(&self, bookie_id: &BookieId, ledger_id: LedgerId) -> Result<()> {
803 let client = self.create_or_get_client(bookie_id).await?;
804 client.force_ledger(ledger_id).await
805 }
806
807 pub async fn add_entry(
808 &self,
809 bookie_id: &BookieId,
810 entry: &AddingEntry<'_>,
811 options: &AddOptions<'_>,
812 ) -> Result<()> {
813 let client = self.create_or_get_client(bookie_id).await?;
814 client.add_entry(entry, options).await
815 }
816}
817
818impl Client {
819 pub async fn connect<A: ToSocketAddrs>(addr: A, _conf: &Configuration) -> Result<Client> {
820 impl From<io::Error> for BkError {
821 fn from(_: io::Error) -> BkError {
822 BkError::with_description(ErrorKind::BookieNotAvailable, &"can't connect to bookie")
823 }
824 }
825 let tcp_stream = TcpStream::connect(addr).await?;
826 let state = Arc::new(State::new());
827 let (sender, receiver) = mpsc::channel(512);
828 tokio::spawn(io_loop(tcp_stream, state.clone(), receiver));
829 let client = Client { state, requester: sender };
830 Ok(client)
831 }
832
833 fn serialize_request(&self, request: &Request) -> Result<Vec<u8>> {
834 let mut buf = self.state.get_buf();
835 unsafe { buf.set_len(4) };
836 if let Err(err) = request.encode(&mut buf) {
837 return Err(BkError::with_description(ErrorKind::UnexpectedError, &"fail to encode request").cause_by(err));
838 }
839 let len = buf.len() as u32 - 4;
840 let mut len_buf = &mut buf[..4];
841 len_buf.put_u32(len);
842 Ok(buf)
843 }
844
845 async fn send(&self, txn_id: TxnId, request: &Request) -> Result<oneshot::Receiver<Result<Response>>> {
846 let bytes = self.serialize_request(request)?;
847 let (sender, receiver) = oneshot::channel();
848 let message = MessageRequest { txn_id, bytes, responser: sender };
849 if self.requester.send(message).await.is_err() {
850 let err = unsafe { self.state.get_err().expect("no error after message channel closed") };
851 return Err(err);
852 }
853 Ok(receiver)
854 }
855
856 async fn request<T: Operation>(&self, operation: T) -> Result<T::Response> {
857 let txn_id = self.state.next_txn_id();
858 let request = operation.request_of(txn_id);
859 let receiver = self.send(txn_id, &request).await?;
860 let response = match receiver.await.expect("no response after message sent") {
861 Ok(response) => response,
862 Err(err) => return Err(err),
863 };
864 T::response_of(response)
865 }
866
867 pub async fn fetch_entry_internal(
868 &self,
869 ledger_id: LedgerId,
870 entry_id: EntryId,
871 options: &ReadOptions<'_>,
872 ) -> Result<(EntryId, FetchedEntry)> {
873 let flag = if options.fence_ledger { Some(read_request::Flag::FenceLedger.into()) } else { None };
874 let read = ReadRequest {
875 flag,
876 ledger_id: ledger_id.into(),
877 entry_id: entry_id.into(),
878 master_key: options.master_key.map(|key| key.to_owned()),
879 ..Default::default()
880 };
881 let response = self.request(read).await?;
882 let max_lac = response.max_lac.unwrap_or(INVALID_ENTRY_ID);
883 let Some(body) = response.body else {
884 return Err(BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"no entry body in response"));
885 };
886 let body = EntryBody(body);
887 let digester = options.digest_algorithm.digester();
888 let EntryContent { entry_id: actual_entry_id, payload, last_add_confirmed, ledger_length } =
889 body.extract_entry(ledger_id, entry_id, digester)?;
890 Ok((actual_entry_id, FetchedEntry {
891 last_add_confirmed,
892 max_lac: last_add_confirmed.max(EntryId(max_lac)),
893 ledger_length,
894 payload,
895 }))
896 }
897
898 pub async fn read_entry(
899 &self,
900 ledger_id: LedgerId,
901 entry_id: EntryId,
902 options: &ReadOptions<'_>,
903 ) -> Result<FetchedEntry> {
904 let (_, entry) = self.fetch_entry_internal(ledger_id, entry_id, options).await?;
905 Ok(entry)
906 }
907
908 pub async fn read_last_entry(
909 &self,
910 ledger_id: LedgerId,
911 options: &ReadOptions<'_>,
912 ) -> Result<(EntryId, FetchedEntry)> {
913 self.fetch_entry_internal(ledger_id, LAST_ADD_CONFIRMED, options).await
914 }
915
916 pub async fn poll_entry(
917 &self,
918 ledger_id: LedgerId,
919 entry_id: EntryId,
920 options: &PollOptions<'_>,
921 ) -> Result<PolledEntry> {
922 let previous_lac = entry_id - 1;
923 let read = ReadRequest {
924 flag: Some(read_request::Flag::EntryPiggyback.into()),
925 ledger_id: ledger_id.into(),
926 entry_id: LAST_ADD_CONFIRMED.into(),
927 previous_lac: Some(previous_lac.into()),
928 time_out: Some(options.timeout.as_millis() as i64),
929 ..Default::default()
930 };
931 let response = self.request(read).await?;
932 let Some(max_lac) = response.max_lac else {
933 let err = BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"no last add confirmed");
934 return Err(err);
935 };
936 if response.entry_id == LAST_ADD_CONFIRMED.0 {
937 return Ok(PolledEntry { last_add_confirmed: EntryId(max_lac), payload: None });
938 }
939 let Some(body) = response.body else {
940 let err = BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"no entry body in response");
941 return Err(err);
942 };
943 let body = EntryBody(body);
944 let digester = options.digest_algorithm.digester();
945 let EntryContent { payload, last_add_confirmed, .. } = body.extract_entry(ledger_id, entry_id, digester)?;
946 Ok(PolledEntry { last_add_confirmed: last_add_confirmed.max(EntryId(max_lac)), payload: Some(payload) })
947 }
948
949 pub async fn write_lac(
950 &self,
951 ledger_id: LedgerId,
952 explicit_lac: EntryId,
953 options: &WriteLacOptions<'_>,
954 ) -> Result<()> {
955 let mut digester = options.digest_algorithm.digester();
956 let mut body = Vec::with_capacity(LAC_METADATA_LENGTH + digester.digest_length());
957 body.put_i64(ledger_id.into());
958 body.put_i64(explicit_lac.into());
959 digester.update(&body);
960 digester.digest(&mut body);
961 let request = WriteLacRequest {
962 ledger_id: ledger_id.into(),
963 lac: explicit_lac.into(),
964 master_key: options.master_key.to_vec(),
965 body,
966 };
967 self.request(request).await?;
968 Ok(())
969 }
970
971 pub async fn read_lac(&self, ledger_id: LedgerId, digest_algorithm: &DigestAlgorithm) -> Result<EntryId> {
972 let request = ReadLacRequest { ledger_id: ledger_id.into() };
973 let response = self.request(request).await?;
974 let explicit_lac = if let Some(body) = response.lac_body {
975 let mut body = EntryBody(body);
976 body.extract_explicit_lac(ledger_id, digest_algorithm.digester())?
977 } else {
978 EntryId::INVALID
979 };
980 let piggyback_lac = if let Some(body) = response.last_entry_body {
981 let mut body = EntryBody(body);
982 body.extract_piggyback_lac(ledger_id, EntryId::INVALID, digest_algorithm.digester())?
983 } else {
984 EntryId::INVALID
985 };
986 Ok(explicit_lac.max(piggyback_lac))
987 }
988
989 pub async fn force_ledger(&self, ledger_id: LedgerId) -> Result<()> {
990 let request = ForceLedgerRequest { ledger_id: ledger_id.into() };
991 let _response = self.request(request).await?;
992 Ok(())
993 }
994
995 pub async fn add_entry(&self, entry: &AddingEntry<'_>, options: &AddOptions<'_>) -> Result<()> {
996 let mut digester = options.digest_algorithm.digester();
997 let mut body = Vec::with_capacity(ENTRY_METADATA_LENGTH + digester.digest_length() + entry.payload.len());
998 let buf = &mut body;
999 buf.put_i64(entry.ledger_id.into());
1000 buf.put_i64(entry.entry_id.into());
1001 buf.put_i64(entry.last_add_confirmed.into());
1002 buf.put_i64(entry.accumulated_ledger_length.into());
1003 digester.update(&body);
1004 digester.update(entry.payload);
1005 digester.digest(&mut body);
1006 body.extend_from_slice(entry.payload);
1007
1008 let mut request = AddRequest {
1009 ledger_id: entry.ledger_id.into(),
1010 entry_id: entry.entry_id.into(),
1011 body,
1012 master_key: Vec::from(options.master_key),
1013 ..Default::default()
1014 };
1015 if options.deferred_sync {
1016 request.write_flags = Some(DEFERRED_WRITE);
1017 }
1018 if options.recovery_add {
1019 request.flag = Some(add_request::Flag::RecoveryAdd.into());
1020 }
1021 self.request(request).await?;
1022 Ok(())
1023 }
1024}