bookkeeper_client/client/
bookie.rs

1use std::collections::{HashMap, VecDeque};
2use std::convert::Into;
3use std::io;
4use std::mem::MaybeUninit;
5use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex, RwLock};
7use std::time::Duration;
8
9use bytes::buf::{Buf, BufMut};
10use futures::future::FutureExt;
11use ignore_result::Ignore;
12use prost::Message;
13use tokio::net::{TcpStream, ToSocketAddrs};
14use tokio::select;
15use tokio::sync::mpsc;
16use tokio::sync::oneshot::{self, Sender};
17
18use super::digest::traits::{Algorithm as _, Digester as _};
19use super::digest::{Algorithm as DigestAlgorithm, Digester};
20use super::errors::{BkError, ErrorKind};
21use super::metadata::{BookieId, EntryId, LedgerId, LedgerLength};
22use crate::future::SelectIterable;
23use crate::meta::util::{BookieRegistry, BookieRegistrySnapshot};
24use crate::proto::*;
25
26pub type Result<T> = std::result::Result<T, BkError>;
27
28static LAST_ADD_CONFIRMED: EntryId = EntryId::INVALID;
29
30/// ledger_id + entry_id + piggyback lac + ledger length
31const ENTRY_METADATA_LENGTH: usize = 32;
32
33/// ledger_id + explicit lac
34const LAC_METADATA_LENGTH: usize = 16;
35
36const DEFERRED_WRITE: i32 = 1;
37const INVALID_ENTRY_ID: i64 = -1;
38
39const MAX_DIGEST_LENGTH: usize = 20;
40
41#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
42struct TxnId(u64);
43
44impl From<u64> for TxnId {
45    fn from(txn_id: u64) -> Self {
46        TxnId(txn_id)
47    }
48}
49
50impl From<TxnId> for u64 {
51    fn from(txn_id: TxnId) -> u64 {
52        txn_id.0
53    }
54}
55
56pub struct PolledEntry {
57    pub last_add_confirmed: EntryId,
58    /// Payload could be None in timed out or bookie error.
59    pub payload: Option<Vec<u8>>,
60}
61
62fn response_status_to_error(status: i32) -> BkError {
63    if status == StatusCode::Enoledger as i32 {
64        BkError::new(ErrorKind::LedgerNotExisted)
65    } else if status == StatusCode::Enoentry as i32 {
66        BkError::new(ErrorKind::EntryNotExisted)
67    } else if status == StatusCode::Ebadreq as i32 {
68        BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"bad request")
69    } else if status == StatusCode::Eio as i32 {
70        BkError::new(ErrorKind::BookieIoError)
71    } else if status == StatusCode::Eua as i32 {
72        BkError::new(ErrorKind::UnauthorizedAccess)
73    } else if status == StatusCode::Ebadversion as i32 {
74        BkError::new(ErrorKind::BookieBadVersion)
75    } else if status == StatusCode::Efenced as i32 {
76        BkError::new(ErrorKind::LedgerFenced)
77    } else if status == StatusCode::Ereadonly as i32 {
78        BkError::new(ErrorKind::BookieReadOnly)
79    } else if status == StatusCode::Etoomanyrequests as i32 {
80        BkError::new(ErrorKind::BookieTooManyRequests)
81    } else {
82        BkError::with_message(ErrorKind::BookieUnexpectedResponse, format!("unknown status code {}", status))
83    }
84}
85
86trait Operation {
87    type Response;
88
89    fn type_of() -> OperationType;
90
91    fn request_of(self, txn_id: TxnId) -> Request;
92
93    fn extract_response(response: &mut Response) -> Option<Self::Response>;
94
95    fn extract_status(_: &Self::Response) -> i32 {
96        StatusCode::Eok as i32
97    }
98
99    fn head_of(txn_id: TxnId) -> BkPacketHeader {
100        BkPacketHeader {
101            version: ProtocolVersion::VersionThree as i32,
102            operation: Self::type_of() as i32,
103            txn_id: txn_id.into(),
104            ..Default::default()
105        }
106    }
107
108    fn response_of(mut response: Response) -> Result<Self::Response> {
109        let op_type = response.header.operation;
110        if op_type != Self::type_of() as i32 {
111            let msg = format!("expect response type {:?}, got {:?}", Self::type_of(), op_type);
112            let err = BkError::with_message(ErrorKind::BookieUnexpectedResponse, msg);
113            return Err(err);
114        }
115        if response.status != StatusCode::Eok as i32 {
116            let err = response_status_to_error(response.status);
117            return Err(err);
118        }
119        let Some(inner_response) = Self::extract_response(&mut response) else {
120            let err = BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"no response");
121            return Err(err);
122        };
123        let status = Self::extract_status(&inner_response);
124        if status != StatusCode::Eok as i32 {
125            let err = response_status_to_error(response.status);
126            return Err(err);
127        }
128        Ok(inner_response)
129    }
130}
131
132impl Operation for ReadRequest {
133    type Response = ReadResponse;
134
135    fn type_of() -> OperationType {
136        OperationType::ReadEntry
137    }
138
139    fn request_of(self, txn_id: TxnId) -> Request {
140        Request { header: Self::head_of(txn_id), read_request: Some(self), ..Default::default() }
141    }
142
143    fn extract_response(response: &mut Response) -> Option<Self::Response> {
144        response.read_response.take()
145    }
146
147    fn extract_status(response: &Self::Response) -> i32 {
148        response.status
149    }
150}
151
152impl Operation for AddRequest {
153    type Response = AddResponse;
154
155    fn type_of() -> OperationType {
156        OperationType::AddEntry
157    }
158
159    fn request_of(self, txn_id: TxnId) -> Request {
160        Request { header: Self::head_of(txn_id), add_request: Some(self), ..Default::default() }
161    }
162
163    fn extract_response(response: &mut Response) -> Option<Self::Response> {
164        response.add_response.take()
165    }
166
167    fn extract_status(response: &Self::Response) -> i32 {
168        response.status
169    }
170}
171
172impl Operation for WriteLacRequest {
173    type Response = WriteLacResponse;
174
175    fn type_of() -> OperationType {
176        OperationType::WriteLac
177    }
178
179    fn request_of(self, txn_id: TxnId) -> Request {
180        Request { header: Self::head_of(txn_id), write_lac_request: Some(self), ..Default::default() }
181    }
182
183    fn extract_response(response: &mut Response) -> Option<Self::Response> {
184        response.write_lac_response.take()
185    }
186
187    fn extract_status(response: &Self::Response) -> i32 {
188        response.status
189    }
190}
191
192impl Operation for ReadLacRequest {
193    type Response = ReadLacResponse;
194
195    fn type_of() -> OperationType {
196        OperationType::ReadLac
197    }
198
199    fn request_of(self, txn_id: TxnId) -> Request {
200        Request { header: Self::head_of(txn_id), read_lac_request: Some(self), ..Default::default() }
201    }
202
203    fn extract_response(response: &mut Response) -> Option<Self::Response> {
204        response.read_lac_response.take()
205    }
206
207    fn extract_status(response: &Self::Response) -> i32 {
208        response.status
209    }
210}
211
212impl Operation for StartTlsRequest {
213    type Response = StartTlsResponse;
214
215    fn type_of() -> OperationType {
216        OperationType::StartTls
217    }
218
219    fn request_of(self, txn_id: TxnId) -> Request {
220        Request { header: Self::head_of(txn_id), start_tls_request: Some(self), ..Default::default() }
221    }
222
223    fn extract_response(response: &mut Response) -> Option<Self::Response> {
224        response.start_tls_response.take()
225    }
226}
227
228impl Operation for ForceLedgerRequest {
229    type Response = ForceLedgerResponse;
230
231    fn type_of() -> OperationType {
232        OperationType::ForceLedger
233    }
234
235    fn request_of(self, txn_id: TxnId) -> Request {
236        Request { header: Self::head_of(txn_id), force_ledger_request: Some(self), ..Default::default() }
237    }
238
239    fn extract_response(response: &mut Response) -> Option<Self::Response> {
240        response.force_ledger_response.take()
241    }
242
243    fn extract_status(response: &Self::Response) -> i32 {
244        response.status
245    }
246}
247
248impl Operation for GetBookieInfoRequest {
249    type Response = GetBookieInfoResponse;
250
251    fn type_of() -> OperationType {
252        OperationType::GetBookieInfo
253    }
254
255    fn request_of(self, txn_id: TxnId) -> Request {
256        Request { header: Self::head_of(txn_id), get_bookie_info_request: Some(self), ..Default::default() }
257    }
258
259    fn extract_response(response: &mut Response) -> Option<Self::Response> {
260        response.get_bookie_info_response.take()
261    }
262
263    fn extract_status(response: &Self::Response) -> i32 {
264        response.status
265    }
266}
267
268impl Operation for GetListOfEntriesOfLedgerRequest {
269    type Response = GetListOfEntriesOfLedgerResponse;
270
271    fn type_of() -> OperationType {
272        OperationType::GetListOfEntriesOfLedger
273    }
274
275    fn request_of(self, txn_id: TxnId) -> Request {
276        Request {
277            header: Self::head_of(txn_id),
278            get_list_of_entries_of_ledger_request: Some(self),
279            ..Default::default()
280        }
281    }
282
283    fn extract_response(response: &mut Response) -> Option<Self::Response> {
284        response.get_list_of_entries_of_ledger_response.take()
285    }
286
287    fn extract_status(response: &Self::Response) -> i32 {
288        response.status
289    }
290}
291
292pub struct AddingEntry<'a> {
293    pub ledger_id: LedgerId,
294    pub entry_id: EntryId,
295    pub last_add_confirmed: EntryId,
296    pub accumulated_ledger_length: LedgerLength,
297    pub payload: &'a [u8],
298}
299
300pub struct AddOptions<'a> {
301    pub recovery_add: bool,
302    pub high_priority: bool,
303    pub deferred_sync: bool,
304    pub master_key: &'a [u8],
305    pub digest_algorithm: &'a DigestAlgorithm,
306}
307
308pub struct FetchedEntry {
309    pub max_lac: EntryId,
310    pub last_add_confirmed: EntryId,
311    pub ledger_length: LedgerLength,
312    pub payload: Vec<u8>,
313}
314
315pub struct ReadOptions<'a> {
316    pub fence_ledger: bool,
317    pub high_priority: bool,
318    pub digest_algorithm: &'a DigestAlgorithm,
319    pub master_key: Option<&'a [u8]>,
320}
321
322pub struct PollOptions<'a> {
323    pub timeout: Duration,
324    pub digest_algorithm: &'a DigestAlgorithm,
325}
326
327pub struct WriteLacOptions<'a> {
328    pub master_key: &'a [u8],
329    pub digest_algorithm: &'a DigestAlgorithm,
330}
331
332struct EntryMetadata {
333    entry_id: EntryId,
334    last_add_confirmed: EntryId,
335    ledger_length: LedgerLength,
336}
337
338struct EntryContent {
339    entry_id: EntryId,
340    last_add_confirmed: EntryId,
341    ledger_length: LedgerLength,
342    payload: Vec<u8>,
343}
344
345struct EntryBody(Vec<u8>);
346
347impl EntryBody {
348    fn corrupted(&mut self) -> BkError {
349        BkError::with_description(ErrorKind::EntryInvalidData, &"corrupted data")
350    }
351
352    fn extract_explicit_lac(&mut self, ledger_id: LedgerId, mut digester: Digester) -> Result<EntryId> {
353        let digest_len = digester.digest_length();
354        let prefix_len = LAC_METADATA_LENGTH + digest_len;
355        let body = &self.0;
356        if body.len() < prefix_len {
357            return Err(self.corrupted());
358        }
359        let mut metadata = &body[..LAC_METADATA_LENGTH];
360        digester.update(metadata);
361        #[allow(invalid_value)]
362        #[allow(clippy::uninit_assumed_init)]
363        let mut buf: [u8; MAX_DIGEST_LENGTH] = unsafe { MaybeUninit::uninit().assume_init() };
364        digester.digest(&mut &mut buf[..digest_len]);
365        if buf[..digest_len] != body[LAC_METADATA_LENGTH..prefix_len] {
366            return Err(self.corrupted());
367        }
368        let actual_ledger_id = metadata.get_i64();
369        if ledger_id != actual_ledger_id {
370            let msg = format!("expect ledger id {} in reading lac, got {}", ledger_id, actual_ledger_id);
371            let err = BkError::with_message(ErrorKind::BookieUnexpectedResponse, msg);
372            return Err(err);
373        }
374        let lac = metadata.get_i64();
375        Ok(EntryId(lac))
376    }
377
378    fn verify_entry(
379        &mut self,
380        ledger_id: LedgerId,
381        entry_id: EntryId,
382        mut digester: Digester,
383    ) -> Result<EntryMetadata> {
384        let digest_len = digester.digest_length();
385        let prefix_len = ENTRY_METADATA_LENGTH + digest_len;
386        let body = &self.0;
387        if body.len() < prefix_len {
388            return Err(self.corrupted());
389        }
390        let mut metadata = &body[..ENTRY_METADATA_LENGTH];
391        digester.update(metadata);
392        digester.update(&body[prefix_len..]);
393        #[allow(invalid_value)]
394        #[allow(clippy::uninit_assumed_init)]
395        let mut buf: [u8; MAX_DIGEST_LENGTH] = unsafe { MaybeUninit::uninit().assume_init() };
396        digester.digest(&mut &mut buf[..digest_len]);
397        if buf[..digest_len] != body[ENTRY_METADATA_LENGTH..prefix_len] {
398            return Err(self.corrupted());
399        }
400        let actual_ledger_id = metadata.get_i64();
401        let actual_entry_id = metadata.get_i64();
402        let last_add_confirmed = metadata.get_i64();
403        let accumulated_payload_len = metadata.get_i64();
404        let payload_len = body.len() - prefix_len;
405        if ledger_id != actual_ledger_id {
406            let msg = format!("expect ledger id {} in reading lac, got {}", ledger_id, actual_ledger_id);
407            let err = BkError::with_message(ErrorKind::BookieUnexpectedResponse, msg);
408            return Err(err);
409        }
410        if entry_id != EntryId::INVALID && entry_id != actual_entry_id {
411            let msg = format!("expect entry id {:?}, got {}", entry_id, actual_entry_id);
412            let err = BkError::with_message(ErrorKind::BookieUnexpectedResponse, msg);
413            return Err(err);
414        }
415        if last_add_confirmed >= actual_entry_id {
416            return Err(self.corrupted());
417        }
418        if accumulated_payload_len < payload_len as i64 {
419            let msg =
420                format!("accumulated ledger length {} < entry payload length {}", accumulated_payload_len, payload_len);
421            let err = BkError::with_message(ErrorKind::BookieUnexpectedResponse, msg);
422            return Err(err);
423        }
424        Ok(EntryMetadata {
425            entry_id: EntryId(actual_entry_id),
426            last_add_confirmed: EntryId(last_add_confirmed),
427            ledger_length: accumulated_payload_len.into(),
428        })
429    }
430
431    fn extract_piggyback_lac(&mut self, ledger_id: LedgerId, entry_id: EntryId, digester: Digester) -> Result<EntryId> {
432        let EntryMetadata { last_add_confirmed, .. } = self.verify_entry(ledger_id, entry_id, digester)?;
433        Ok(last_add_confirmed)
434    }
435
436    fn extract_entry(mut self, ledger_id: LedgerId, entry_id: EntryId, digester: Digester) -> Result<EntryContent> {
437        let digest_len = digester.digest_length();
438        let EntryMetadata { entry_id: actual_entry_id, last_add_confirmed, ledger_length } =
439            self.verify_entry(ledger_id, entry_id, digester)?;
440        let mut body = self.0;
441        let prefix_len = ENTRY_METADATA_LENGTH + digest_len;
442        body.drain(..prefix_len);
443        Ok(EntryContent { entry_id: actual_entry_id, last_add_confirmed, ledger_length, payload: body })
444    }
445}
446
447#[derive(Debug)]
448struct MessageRequest {
449    txn_id: TxnId,
450    bytes: Vec<u8>,
451    responser: Sender<Result<Response>>,
452}
453
454async fn io_loop(sock: TcpStream, state: Arc<State>, mut request_receiver: mpsc::Receiver<MessageRequest>) {
455    let mut read_buf = Vec::with_capacity(4096);
456    let mut pending_requests: VecDeque<MessageRequest> = VecDeque::with_capacity(512);
457    let mut pending_txns: HashMap<TxnId, oneshot::Sender<Result<Response>>> = HashMap::with_capacity(20);
458    let mut io_slices: Vec<std::io::IoSlice<'_>> = Vec::with_capacity(20);
459    let mut first_request_offset = 0usize;
460    let err = 'io: loop {
461        select! {
462            _ = sock.readable() => {
463                match sock.try_read_buf(&mut read_buf) {
464                    Ok(0) =>  {
465                        let err = BkError::with_description(ErrorKind::BookieNotAvailable, &"bookie closed");
466                        break err;
467                    },
468                    Ok(_) => {
469                        while read_buf.len() > 4 {
470                            let mut len_buf = &read_buf[..4];
471                            let msg_len = len_buf.get_u32() as usize + 4;
472                            if read_buf.len() < msg_len {
473                                continue;
474                            }
475                            let response_buf = &read_buf[4..msg_len];
476                            let response = match Response::decode(response_buf) {
477                                Err(e) => break 'io BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"decode failure").cause_by(e),
478                                Ok(response) => response,
479                            };
480                            drop(read_buf.drain(..msg_len));
481                            let txn_id = TxnId(response.header.txn_id);
482                            if let Some(responser) = pending_txns.remove(&txn_id) {
483                                responser.send(Ok(response)).ignore();
484                            }
485                        }
486                    },
487                    Err(e) => {
488                        if e.kind() == io::ErrorKind::WouldBlock {
489                            continue;
490                        }
491                        let err = BkError::with_description(ErrorKind::BookieNotAvailable, &"bookie read failed").cause_by(e);
492                        break err;
493                    },
494                }
495            },
496            r = request_receiver.recv(), if pending_requests.len() < 512 => {
497                match r {
498                    Some(request) => {
499                        pending_requests.push_back(request);
500                    },
501                    None => return,
502                };
503            },
504            _ = sock.writable(), if !pending_requests.is_empty() => {
505                io_slices.clear();
506                io_slices.push(std::io::IoSlice::new(unsafe {&*(&pending_requests[0].bytes[first_request_offset..] as *const [u8])}));
507                pending_requests.iter().skip(1).for_each(|r| {
508                    io_slices.push(std::io::IoSlice::new(unsafe {&*(&r.bytes[first_request_offset..] as *const [u8])}));
509                });
510                match sock.try_write_vectored(&io_slices) {
511                    Ok(mut n) => {
512                        if n < pending_requests[0].bytes.len() - first_request_offset {
513                            first_request_offset += n;
514                            continue;
515                        }
516                        n -= pending_requests[0].bytes.len() - first_request_offset;
517                        let MessageRequest{txn_id, responser, bytes} = pending_requests.pop_front().unwrap();
518                        pending_txns.insert(txn_id, responser);
519                        state.release_buf(bytes);
520                        first_request_offset = 0;
521                        while let Some(request) = pending_requests.front() {
522                            if n < request.bytes.len() {
523                                first_request_offset = n;
524                                break;
525                            }
526                            let MessageRequest{txn_id, responser, bytes} = pending_requests.pop_front().unwrap();
527                            n -= bytes.len();
528                            pending_txns.insert(txn_id, responser);
529                            state.release_buf(bytes);
530                        }
531                    },
532                    Err(e) => {
533                        if e.kind() == io::ErrorKind::WouldBlock {
534                            continue;
535                        }
536                        let err = BkError::with_description(ErrorKind::BookieNotAvailable, &"bookie write failed").cause_by(e);
537                        break err;
538                    }
539                }
540            },
541        }
542    };
543    // Set error state before closing receiver to shape happens-before relation:
544    // set-error ==> receiver closed ==> sending failed ==> get-error
545    unsafe { state.set_err(err.clone()) };
546    request_receiver.close();
547    pending_requests.into_iter().for_each(|MessageRequest { responser, .. }| responser.send(Err(err.clone())).ignore());
548    loop {
549        let MessageRequest { responser, .. } = match request_receiver.recv().await {
550            None => break,
551            Some(request) => request,
552        };
553        responser.send(Err(err.clone())).ignore();
554    }
555}
556
557struct State {
558    txn_id_counter: AtomicU64,
559    err: AtomicPtr<BkError>,
560    buffers: Mutex<Vec<Vec<u8>>>,
561}
562
563impl State {
564    fn new() -> State {
565        State {
566            txn_id_counter: AtomicU64::new(1),
567            err: AtomicPtr::new(std::ptr::null_mut()),
568            buffers: Mutex::new(Vec::with_capacity(512)),
569        }
570    }
571
572    fn next_txn_id(&self) -> TxnId {
573        let txn_id = self.txn_id_counter.fetch_add(1, Ordering::Relaxed);
574        TxnId::from(txn_id)
575    }
576
577    fn release_buf(&self, mut buf: Vec<u8>) {
578        buf.clear();
579        let mut buffers = self.buffers.lock().unwrap();
580        buffers.push(buf);
581    }
582
583    fn get_buf(&self) -> Vec<u8> {
584        let mut buffers = self.buffers.lock().unwrap();
585        let buf = buffers.pop();
586        drop(buffers);
587        buf.unwrap_or_else(|| Vec::with_capacity(1024))
588    }
589
590    unsafe fn get_err(&self) -> Option<BkError> {
591        let ptr = self.err.load(Ordering::Relaxed);
592        if ptr.is_null() {
593            return None;
594        }
595        let err = &mut *ptr;
596        Some(err.clone())
597    }
598
599    unsafe fn set_err(&self, err: BkError) {
600        let err = Box::into_raw(Box::new(err));
601        self.err.store(err, Ordering::Relaxed);
602    }
603}
604
605impl Drop for State {
606    fn drop(&mut self) {
607        let err = *self.err.get_mut();
608        if err.is_null() {
609            return;
610        }
611        let _ = unsafe { Box::from_raw(err) };
612    }
613}
614
615pub struct Client {
616    state: Arc<State>,
617    requester: mpsc::Sender<MessageRequest>,
618}
619
620pub struct Configuration {}
621
622enum ClientRequest {
623    Create { bookie_id: BookieId, responser: oneshot::Sender<Result<Arc<Client>>> },
624}
625
626impl std::fmt::Debug for ClientRequest {
627    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
628        match self {
629            ClientRequest::Create { bookie_id, .. } => write!(f, "client creation request for {}", bookie_id),
630        }
631    }
632}
633
634async fn connect(registry: BookieRegistrySnapshot, bookie_id: BookieId) -> (BookieId, Result<Client>) {
635    let bookie = match registry.get_service_info(&bookie_id) {
636        None => return (bookie_id, Err(BkError::with_description(ErrorKind::BookieNotAvailable, &"no service info"))),
637        Some(bookie) => bookie,
638    };
639    let endpoint = match bookie.rpc_endpoint() {
640        None => return (bookie_id, Err(BkError::with_description(ErrorKind::BookieNotAvailable, &"no rpc endpoint"))),
641        Some(endpoint) => endpoint,
642    };
643    let client = Client::connect((endpoint.host.as_str(), endpoint.port), &Configuration {}).await;
644    (bookie_id, client)
645}
646
647fn get_client(clients: &RwLock<HashMap<BookieId, Arc<Client>>>, bookie_id: &BookieId) -> Option<Arc<Client>> {
648    let clients = clients.read().unwrap();
649    return clients.get(bookie_id).cloned();
650}
651
652async fn client_request_loop(
653    clients: Arc<RwLock<HashMap<BookieId, Arc<Client>>>>,
654    mut requester: mpsc::UnboundedReceiver<ClientRequest>,
655    registry: BookieRegistry,
656) {
657    let mut pending_requests: HashMap<BookieId, Vec<_>> = HashMap::new();
658    let mut connecting_clients = VecDeque::new();
659    let mut registry_snapshot = registry.snapshot();
660    let mut released_responsers = Vec::new();
661    loop {
662        select! {
663            r = requester.recv() => {
664                let ClientRequest::Create {bookie_id, responser} = match r {
665                    None => break,
666                    Some(request) => request,
667                };
668                if let Some(client) = get_client(&clients, &bookie_id) {
669                    responser.send(Ok(client)).ignore();
670                    continue;
671                }
672                let entry = pending_requests.entry(bookie_id.clone());
673                let responsers = entry.or_insert_with(|| released_responsers.pop().unwrap_or_default());
674                if responsers.is_empty() {
675                    registry.update(&mut registry_snapshot);
676                    connecting_clients.push_back(connect(registry_snapshot.clone(), bookie_id).fuse());
677                }
678                responsers.push(responser);
679            },
680            (i, (bookie_id, r)) = SelectIterable::next(&mut connecting_clients) => {
681                connecting_clients.swap_remove_back(i);
682                let (_, mut responsers) = pending_requests.remove_entry(&bookie_id).unwrap();
683                let result = r.map(Arc::new);
684                responsers.drain(..).for_each(|responser| {
685                    responser.send(result.clone()).ignore();
686                });
687                if let Ok(client) = result {
688                    clients.write().unwrap().insert(bookie_id, client);
689                }
690                released_responsers.push(responsers);
691            },
692        }
693    }
694}
695
696pub(crate) struct PoolledClient {
697    clients: Arc<RwLock<HashMap<BookieId, Arc<Client>>>>,
698    requester: mpsc::UnboundedSender<ClientRequest>,
699}
700
701impl PoolledClient {
702    pub fn new(bookie_registory: BookieRegistry) -> PoolledClient {
703        let clients = Arc::new(RwLock::new(HashMap::with_capacity(512)));
704        let (sender, receiver) = mpsc::unbounded_channel();
705        tokio::spawn(client_request_loop(clients.clone(), receiver, bookie_registory));
706        PoolledClient { clients, requester: sender }
707    }
708
709    pub fn prepare_ensemble(&self, bookies: &[BookieId]) {
710        let mut has_clients = vec![false; bookies.len()];
711        let clients = self.clients.read().unwrap();
712        for (i, bookie_id) in bookies.iter().enumerate() {
713            has_clients[i] = clients.contains_key(bookie_id);
714        }
715        drop(clients);
716        for (i, has) in has_clients.iter().enumerate() {
717            if !has {
718                let bookie_id = &bookies[i];
719                let (sender, _) = oneshot::channel();
720                self.requester.send(ClientRequest::Create { bookie_id: bookie_id.clone(), responser: sender }).unwrap();
721            }
722        }
723    }
724
725    fn get_client(&self, bookie_id: &BookieId) -> Option<Arc<Client>> {
726        let clients = self.clients.read().unwrap();
727        clients.get(bookie_id.as_str()).cloned()
728    }
729
730    async fn create_or_get_client(&self, bookie_id: &BookieId) -> Result<Arc<Client>> {
731        if let Some(client) = self.get_client(bookie_id) {
732            return Ok(client);
733        }
734        let (sender, receiver) = oneshot::channel();
735        self.requester.send(ClientRequest::Create { bookie_id: bookie_id.to_owned(), responser: sender }).unwrap();
736        receiver.await.unwrap()
737    }
738
739    #[allow(dead_code)]
740    fn remove_client(&self, bookie_id: &BookieId, client: Arc<Client>) {
741        let mut clients = self.clients.write().unwrap();
742        match clients.get(bookie_id) {
743            Some(other) if Arc::ptr_eq(&client, other) => clients.remove(bookie_id),
744            _ => return,
745        };
746        drop(clients);
747    }
748
749    pub async fn read_last_entry(
750        &self,
751        bookie_id: &BookieId,
752        ledger_id: LedgerId,
753        options: &ReadOptions<'_>,
754    ) -> Result<(EntryId, FetchedEntry)> {
755        let client = self.create_or_get_client(bookie_id).await?;
756        client.read_last_entry(ledger_id, options).await
757    }
758
759    pub async fn read_entry(
760        &self,
761        bookie_id: &BookieId,
762        ledger_id: LedgerId,
763        entry_id: EntryId,
764        options: &ReadOptions<'_>,
765    ) -> Result<FetchedEntry> {
766        let client = self.create_or_get_client(bookie_id).await?;
767        client.read_entry(ledger_id, entry_id, options).await
768    }
769
770    pub async fn poll_entry(
771        &self,
772        bookie_id: &BookieId,
773        ledger_id: LedgerId,
774        entry_id: EntryId,
775        options: &PollOptions<'_>,
776    ) -> Result<PolledEntry> {
777        let client = self.create_or_get_client(bookie_id).await?;
778        client.poll_entry(ledger_id, entry_id, options).await
779    }
780
781    pub async fn write_lac(
782        &self,
783        bookie_id: &BookieId,
784        ledger_id: LedgerId,
785        explicit_lac: EntryId,
786        options: &WriteLacOptions<'_>,
787    ) -> Result<()> {
788        let client = self.create_or_get_client(bookie_id).await?;
789        client.write_lac(ledger_id, explicit_lac, options).await
790    }
791
792    pub async fn read_lac(
793        &self,
794        bookie_id: &BookieId,
795        ledger_id: LedgerId,
796        digest_algorithm: &DigestAlgorithm,
797    ) -> Result<EntryId> {
798        let client = self.create_or_get_client(bookie_id).await?;
799        client.read_lac(ledger_id, digest_algorithm).await
800    }
801
802    pub async fn force_ledger(&self, bookie_id: &BookieId, ledger_id: LedgerId) -> Result<()> {
803        let client = self.create_or_get_client(bookie_id).await?;
804        client.force_ledger(ledger_id).await
805    }
806
807    pub async fn add_entry(
808        &self,
809        bookie_id: &BookieId,
810        entry: &AddingEntry<'_>,
811        options: &AddOptions<'_>,
812    ) -> Result<()> {
813        let client = self.create_or_get_client(bookie_id).await?;
814        client.add_entry(entry, options).await
815    }
816}
817
818impl Client {
819    pub async fn connect<A: ToSocketAddrs>(addr: A, _conf: &Configuration) -> Result<Client> {
820        impl From<io::Error> for BkError {
821            fn from(_: io::Error) -> BkError {
822                BkError::with_description(ErrorKind::BookieNotAvailable, &"can't connect to bookie")
823            }
824        }
825        let tcp_stream = TcpStream::connect(addr).await?;
826        let state = Arc::new(State::new());
827        let (sender, receiver) = mpsc::channel(512);
828        tokio::spawn(io_loop(tcp_stream, state.clone(), receiver));
829        let client = Client { state, requester: sender };
830        Ok(client)
831    }
832
833    fn serialize_request(&self, request: &Request) -> Result<Vec<u8>> {
834        let mut buf = self.state.get_buf();
835        unsafe { buf.set_len(4) };
836        if let Err(err) = request.encode(&mut buf) {
837            return Err(BkError::with_description(ErrorKind::UnexpectedError, &"fail to encode request").cause_by(err));
838        }
839        let len = buf.len() as u32 - 4;
840        let mut len_buf = &mut buf[..4];
841        len_buf.put_u32(len);
842        Ok(buf)
843    }
844
845    async fn send(&self, txn_id: TxnId, request: &Request) -> Result<oneshot::Receiver<Result<Response>>> {
846        let bytes = self.serialize_request(request)?;
847        let (sender, receiver) = oneshot::channel();
848        let message = MessageRequest { txn_id, bytes, responser: sender };
849        if self.requester.send(message).await.is_err() {
850            let err = unsafe { self.state.get_err().expect("no error after message channel closed") };
851            return Err(err);
852        }
853        Ok(receiver)
854    }
855
856    async fn request<T: Operation>(&self, operation: T) -> Result<T::Response> {
857        let txn_id = self.state.next_txn_id();
858        let request = operation.request_of(txn_id);
859        let receiver = self.send(txn_id, &request).await?;
860        let response = match receiver.await.expect("no response after message sent") {
861            Ok(response) => response,
862            Err(err) => return Err(err),
863        };
864        T::response_of(response)
865    }
866
867    pub async fn fetch_entry_internal(
868        &self,
869        ledger_id: LedgerId,
870        entry_id: EntryId,
871        options: &ReadOptions<'_>,
872    ) -> Result<(EntryId, FetchedEntry)> {
873        let flag = if options.fence_ledger { Some(read_request::Flag::FenceLedger.into()) } else { None };
874        let read = ReadRequest {
875            flag,
876            ledger_id: ledger_id.into(),
877            entry_id: entry_id.into(),
878            master_key: options.master_key.map(|key| key.to_owned()),
879            ..Default::default()
880        };
881        let response = self.request(read).await?;
882        let max_lac = response.max_lac.unwrap_or(INVALID_ENTRY_ID);
883        let Some(body) = response.body else {
884            return Err(BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"no entry body in response"));
885        };
886        let body = EntryBody(body);
887        let digester = options.digest_algorithm.digester();
888        let EntryContent { entry_id: actual_entry_id, payload, last_add_confirmed, ledger_length } =
889            body.extract_entry(ledger_id, entry_id, digester)?;
890        Ok((actual_entry_id, FetchedEntry {
891            last_add_confirmed,
892            max_lac: last_add_confirmed.max(EntryId(max_lac)),
893            ledger_length,
894            payload,
895        }))
896    }
897
898    pub async fn read_entry(
899        &self,
900        ledger_id: LedgerId,
901        entry_id: EntryId,
902        options: &ReadOptions<'_>,
903    ) -> Result<FetchedEntry> {
904        let (_, entry) = self.fetch_entry_internal(ledger_id, entry_id, options).await?;
905        Ok(entry)
906    }
907
908    pub async fn read_last_entry(
909        &self,
910        ledger_id: LedgerId,
911        options: &ReadOptions<'_>,
912    ) -> Result<(EntryId, FetchedEntry)> {
913        self.fetch_entry_internal(ledger_id, LAST_ADD_CONFIRMED, options).await
914    }
915
916    pub async fn poll_entry(
917        &self,
918        ledger_id: LedgerId,
919        entry_id: EntryId,
920        options: &PollOptions<'_>,
921    ) -> Result<PolledEntry> {
922        let previous_lac = entry_id - 1;
923        let read = ReadRequest {
924            flag: Some(read_request::Flag::EntryPiggyback.into()),
925            ledger_id: ledger_id.into(),
926            entry_id: LAST_ADD_CONFIRMED.into(),
927            previous_lac: Some(previous_lac.into()),
928            time_out: Some(options.timeout.as_millis() as i64),
929            ..Default::default()
930        };
931        let response = self.request(read).await?;
932        let Some(max_lac) = response.max_lac else {
933            let err = BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"no last add confirmed");
934            return Err(err);
935        };
936        if response.entry_id == LAST_ADD_CONFIRMED.0 {
937            return Ok(PolledEntry { last_add_confirmed: EntryId(max_lac), payload: None });
938        }
939        let Some(body) = response.body else {
940            let err = BkError::with_description(ErrorKind::BookieUnexpectedResponse, &"no entry body in response");
941            return Err(err);
942        };
943        let body = EntryBody(body);
944        let digester = options.digest_algorithm.digester();
945        let EntryContent { payload, last_add_confirmed, .. } = body.extract_entry(ledger_id, entry_id, digester)?;
946        Ok(PolledEntry { last_add_confirmed: last_add_confirmed.max(EntryId(max_lac)), payload: Some(payload) })
947    }
948
949    pub async fn write_lac(
950        &self,
951        ledger_id: LedgerId,
952        explicit_lac: EntryId,
953        options: &WriteLacOptions<'_>,
954    ) -> Result<()> {
955        let mut digester = options.digest_algorithm.digester();
956        let mut body = Vec::with_capacity(LAC_METADATA_LENGTH + digester.digest_length());
957        body.put_i64(ledger_id.into());
958        body.put_i64(explicit_lac.into());
959        digester.update(&body);
960        digester.digest(&mut body);
961        let request = WriteLacRequest {
962            ledger_id: ledger_id.into(),
963            lac: explicit_lac.into(),
964            master_key: options.master_key.to_vec(),
965            body,
966        };
967        self.request(request).await?;
968        Ok(())
969    }
970
971    pub async fn read_lac(&self, ledger_id: LedgerId, digest_algorithm: &DigestAlgorithm) -> Result<EntryId> {
972        let request = ReadLacRequest { ledger_id: ledger_id.into() };
973        let response = self.request(request).await?;
974        let explicit_lac = if let Some(body) = response.lac_body {
975            let mut body = EntryBody(body);
976            body.extract_explicit_lac(ledger_id, digest_algorithm.digester())?
977        } else {
978            EntryId::INVALID
979        };
980        let piggyback_lac = if let Some(body) = response.last_entry_body {
981            let mut body = EntryBody(body);
982            body.extract_piggyback_lac(ledger_id, EntryId::INVALID, digest_algorithm.digester())?
983        } else {
984            EntryId::INVALID
985        };
986        Ok(explicit_lac.max(piggyback_lac))
987    }
988
989    pub async fn force_ledger(&self, ledger_id: LedgerId) -> Result<()> {
990        let request = ForceLedgerRequest { ledger_id: ledger_id.into() };
991        let _response = self.request(request).await?;
992        Ok(())
993    }
994
995    pub async fn add_entry(&self, entry: &AddingEntry<'_>, options: &AddOptions<'_>) -> Result<()> {
996        let mut digester = options.digest_algorithm.digester();
997        let mut body = Vec::with_capacity(ENTRY_METADATA_LENGTH + digester.digest_length() + entry.payload.len());
998        let buf = &mut body;
999        buf.put_i64(entry.ledger_id.into());
1000        buf.put_i64(entry.entry_id.into());
1001        buf.put_i64(entry.last_add_confirmed.into());
1002        buf.put_i64(entry.accumulated_ledger_length.into());
1003        digester.update(&body);
1004        digester.update(entry.payload);
1005        digester.digest(&mut body);
1006        body.extend_from_slice(entry.payload);
1007
1008        let mut request = AddRequest {
1009            ledger_id: entry.ledger_id.into(),
1010            entry_id: entry.entry_id.into(),
1011            body,
1012            master_key: Vec::from(options.master_key),
1013            ..Default::default()
1014        };
1015        if options.deferred_sync {
1016            request.write_flags = Some(DEFERRED_WRITE);
1017        }
1018        if options.recovery_add {
1019            request.flag = Some(add_request::Flag::RecoveryAdd.into());
1020        }
1021        self.request(request).await?;
1022        Ok(())
1023    }
1024}