1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::fmt::Write as _;
4use std::mem::MaybeUninit;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use async_trait::async_trait;
9use compact_str::CompactString;
10use either::Either;
11use ignore_result::Ignore;
12use log::debug;
13use uuid::Uuid;
14use zookeeper_client as zk;
15
16use super::serde;
17use super::types::{
18 self,
19 BookieRegistrationClient,
20 BookieServiceInfo,
21 BookieUpdate,
22 BookieUpdateStream,
23 LedgerIdStoreClient,
24 LedgerMetadataStoreClient,
25 LedgerMetadataStream,
26 MetaStore,
27 MetaVersion,
28 Versioned,
29};
30use crate::client::errors::{BkError, BkResult, ErrorKind};
31use crate::client::service_uri::ServiceUri;
32use crate::client::{BookieId, LedgerId, LedgerMetadata};
33
34#[derive(Clone)]
35pub struct ZkConfiguration {
36 ledger_id_format: Option<LedgerIdFormat>,
37 root: CompactString,
38 connect: CompactString,
39 timeout: Duration,
40}
41
42impl ZkConfiguration {
43 pub fn from_service_uri(uri: ServiceUri) -> Result<ZkConfiguration, BkError> {
44 assert_eq!(uri.scheme, "zk");
45 let ServiceUri { spec, address, path, .. } = uri;
46 let ledger_id_format = match spec.as_str() {
47 "" | "null" => None,
48 "flat" => Some(LedgerIdFormat::Flat),
49 "hierarchical" => Some(LedgerIdFormat::Hierarchical),
50 "longhierarchical" => Some(LedgerIdFormat::LongHierarchical),
51 _ => {
52 return Err(BkError::with_message(
53 ErrorKind::InvalidServiceUri,
54 format!("unknown ledger id format {}", spec),
55 ))
56 },
57 };
58 let config =
59 ZkConfiguration { ledger_id_format, root: path, connect: address, timeout: Duration::from_secs(10) };
60 Ok(config)
61 }
62}
63
64enum Error {
65 SessionExpired,
66}
67
68impl From<Error> for BkError {
69 fn from(err: Error) -> BkError {
70 use Error::*;
71 match err {
72 SessionExpired => BkError::with_description(ErrorKind::MetaClientError, &"zk session expired"),
73 }
74 }
75}
76
77struct ZkClient {
78 client: zk::Client,
79}
80
81impl ZkClient {
82 fn terminated(&self) -> bool {
83 let state = self.client.state();
84 state.is_terminated()
85 }
86
87 async fn connected(&self) -> Result<(), Error> {
88 let mut watcher = self.client.state_watcher();
89 let mut state = watcher.state();
90 loop {
91 match state {
92 zk::SessionState::SyncConnected => return Ok(()),
93 zk::SessionState::Disconnected => {},
94 _ => return Err(Error::SessionExpired),
95 };
96 state = watcher.changed().await;
97 }
98 }
99}
100
101impl std::ops::Deref for ZkClient {
102 type Target = zk::Client;
103
104 fn deref(&self) -> &zk::Client {
105 &self.client
106 }
107}
108
109impl From<zk::Stat> for MetaVersion {
110 fn from(stat: zk::Stat) -> MetaVersion {
111 MetaVersion(stat.mzxid)
112 }
113}
114
115#[async_trait]
116impl BookieRegistrationClient for ZkMetaStore {
117 async fn watch_readable_bookies(
118 &mut self,
119 ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
120 let mut client = self.get_connected_client().await?;
121 client.watch_bookies("/available/readonly", "").await
122 }
123
124 async fn watch_writable_bookies(
125 &mut self,
126 ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
127 let mut client = self.get_connected_client().await?;
128 client.watch_bookies("/available", "readonly").await
129 }
130}
131
132#[async_trait]
133impl LedgerIdStoreClient for ZkMetaStore {
134 async fn generate_ledger_id(&self) -> Result<LedgerId, BkError> {
135 let mut client = self.get_connected_client().await?;
136 client.generate_ledger_id().await
137 }
138}
139
140impl From<zk::Error> for BkError {
141 fn from(err: zk::Error) -> BkError {
142 let kind = match err {
143 zk::Error::BadVersion => ErrorKind::MetaVersionMismatch,
144 _ => ErrorKind::MetaClientError,
145 };
146 BkError::new(kind).cause_by(err)
147 }
148}
149
150struct ZkSessionClient {
151 client: Arc<ZkClient>,
152 counter: usize,
153 session: String,
154}
155
156struct ZkClientManager {
157 config: ZkConfiguration,
158 client: tokio::sync::Mutex<ZkSessionClient>,
159}
160
161impl ZkClientManager {
162 fn new(config: ZkConfiguration, client: Arc<ZkClient>) -> ZkClientManager {
163 let client = ZkSessionClient { client, counter: 0, session: Uuid::new_v4().to_string() };
164 ZkClientManager { config, client: tokio::sync::Mutex::new(client) }
165 }
166
167 async fn replace_client(&self, client: &Arc<ZkClient>) -> Result<Arc<ZkClient>, BkError> {
168 let mut guard = self.client.lock().await;
169 if !Arc::ptr_eq(client, &guard.client) {
170 return Ok(guard.client.clone());
171 }
172 let zookeeper =
173 zk::Client::builder().with_session_timeout(self.config.timeout).connect(&self.config.connect).await?;
174 let client = Arc::new(ZkClient { client: zookeeper });
175 guard.session.truncate(36);
176 guard.counter += 1;
177 let counter = guard.counter;
178 write!(&mut guard.session, "-{}", counter).ignore();
179 let options = zk::CreateMode::Ephemeral.with_acls(zk::Acls::creator_all());
180 client.create(&guard.session, Default::default(), &options).await?;
182 guard.client = client.clone();
183 Ok(client)
184 }
185}
186
187struct ZkLedgerMetadataStream {
188 watcher: Option<zk::OneshotWatcher>,
189 version: i64,
190 updated: i64,
191 ledger_id: LedgerId,
192 ledger_path: String,
193 client: Arc<ZkClient>,
194 manager: Arc<ZkClientManager>,
195}
196
197impl ZkLedgerMetadataStream {
198 async fn new_client(&mut self) -> Result<(), BkError> {
199 let client = self.manager.replace_client(&self.client).await?;
200 self.client = client;
201 Ok(())
202 }
203
204 async fn connected(&mut self) -> Result<(), BkError> {
205 match self.client.connected().await {
206 Ok(_) => Ok(()),
207 Err(Error::SessionExpired) => self.new_client().await,
208 }
209 }
210
211 async fn watch_exists(&mut self) -> Result<(Option<zk::Stat>, zk::OneshotWatcher), zk::Error> {
212 self.client.check_and_watch_stat(&self.ledger_path).await
213 }
214}
215
216#[async_trait]
217impl LedgerMetadataStream for ZkLedgerMetadataStream {
218 async fn cancel(&mut self) {}
219
220 async fn next(&mut self) -> Result<Versioned<LedgerMetadata>, BkError> {
221 loop {
222 if self.updated > self.version {
223 match self.client.get_data(&self.ledger_path).await {
224 Ok((data, stat)) if stat.mzxid > self.version => {
225 self.version = stat.mzxid;
226 let metadata = serde::deserialize_ledger_metadata(self.ledger_id, &data)?;
227 return Ok(Versioned::new(stat, metadata));
228 },
229 Err(zk::Error::NoNode) => {
230 self.version = self.updated;
231 return Err(BkError::new(ErrorKind::LedgerNotExisted));
232 },
233 _ => {},
234 };
235 }
236 if let Some(watcher) = self.watcher.take() {
237 watcher.changed().await;
238 }
239 match self.watch_exists().await {
240 Ok(r) => {
241 let (stat, watcher) = r;
242 self.watcher = Some(watcher);
243 if let Some(stat) = stat {
244 self.updated = self.updated.max(stat.mzxid);
245 continue;
246 }
247 return Err(BkError::new(ErrorKind::LedgerNotExisted));
248 },
249 Err(zk::Error::ConnectionLoss) => {
250 self.connected().await.ignore();
251 },
252 Err(zk::Error::SessionExpired) => {
253 self.new_client().await?;
254 },
255 Err(e) => {
256 return Err(BkError::new(ErrorKind::MetaClientError).cause_by(e));
257 },
258 };
259 }
260 }
261}
262
263struct ZkBookieUpdateStream {
264 path: &'static str,
265 exclude: &'static str,
266 scratch: String,
267 client: Arc<ZkClient>,
268 manager: Arc<ZkClientManager>,
269 watcher: zk::PersistentWatcher,
270 expired: bool,
271}
272
273impl ZkBookieUpdateStream {
274 async fn new_client(&mut self) -> Result<(), BkError> {
275 let client = self.manager.replace_client(&self.client).await?;
276 self.client = client;
277 Ok(())
278 }
279
280 async fn connected(&mut self) -> Result<(), BkError> {
281 match self.client.connected().await {
282 Ok(_) => Ok(()),
283 Err(_) => self.new_client().await,
284 }
285 }
286
287 async fn get_bookies(
288 client: &ZkClient,
289 path: &str,
290 children: &[String],
291 exclude: &str,
292 scratch: &mut String,
293 ) -> Result<Vec<BookieServiceInfo>, BkError> {
294 let mut bookies = Vec::with_capacity(children.len());
295 for bookie_path in children.iter() {
296 if !exclude.is_empty() && bookie_path.starts_with(exclude) {
297 continue;
298 }
299 scratch.clear();
300 write!(scratch, "{}/{}", path, &bookie_path).ignore();
301 let data = match client.get_data(scratch).await {
302 Err(zk::Error::NoNode) => continue,
303 Err(err) => return Err(From::from(err)),
304 Ok((data, _)) => data,
305 };
306 let bookie_id = BookieId::new(bookie_path);
307 let bookie_result = if data.is_empty() {
308 BookieServiceInfo::from_legacy(bookie_id)
309 } else {
310 BookieServiceInfo::from_protobuf(bookie_id, &data)
311 };
312 match bookie_result {
313 Err(_) => continue,
314 Ok(bookie) => bookies.push(bookie),
315 }
316 }
317 Ok(bookies)
318 }
319
320 async fn rebuild(
321 client: &ZkClient,
322 path: &str,
323 exclude: &str,
324 scratch: &mut String,
325 ) -> Result<(Vec<BookieServiceInfo>, zk::PersistentWatcher), BkError> {
326 let watcher = client.watch(path, zk::AddWatchMode::PersistentRecursive).await?;
327 let children = match client.list_children(path).await {
328 Err(zk::Error::NoNode) => Default::default(),
329 Err(err) => return Err(From::from(err)),
330 Ok(children) => children,
331 };
332 let bookies = Self::get_bookies(client, path, &children, exclude, scratch).await?;
333 Ok((bookies, watcher))
334 }
335
336 async fn on_session_event(&mut self, state: zk::SessionState) -> Result<(), BkError> {
337 if state == zk::SessionState::Disconnected {
338 self.connected().await?;
339 } else if state.is_terminated() {
340 self.new_client().await?;
341 self.expired = true;
342 }
343 Ok(())
344 }
345}
346
347#[async_trait]
348impl BookieUpdateStream for ZkBookieUpdateStream {
349 async fn next(&mut self) -> BkResult<BookieUpdate> {
350 loop {
351 if self.expired {
352 let (bookies, watcher) =
353 Self::rebuild(&self.client, self.path, self.exclude, &mut self.scratch).await?;
354 self.watcher = watcher;
355 self.expired = false;
356 return Ok(BookieUpdate::Reconstruction(bookies));
357 }
358 let event = self.watcher.changed().await;
359 if event.event_type == zk::EventType::Session {
360 self.on_session_event(event.session_state).await?;
361 continue;
362 } else if event.event_type != zk::EventType::NodeChildrenChanged {
363 continue;
364 }
365 let bookie_path = match event.path.strip_prefix(self.path).and_then(|path| path.strip_prefix('/')) {
366 None => continue,
367 Some(bookie_path) => bookie_path,
368 };
369 if bookie_path == self.exclude {
370 continue;
371 }
372 let bookie_id = BookieId::new(bookie_path);
373 if event.event_type == zk::EventType::NodeDeleted {
374 return Ok(BookieUpdate::Remove(bookie_id));
375 }
376 let (data, _) = match self.client.get_data(&event.path).await {
377 Err(_) => continue,
378 Ok(result) => result,
379 };
380 let bookie_result = if data.is_empty() {
381 BookieServiceInfo::from_legacy(bookie_id.clone())
382 } else {
383 BookieServiceInfo::from_protobuf(bookie_id.clone(), &data)
384 };
385 let bookie = match bookie_result {
386 Err(err) => {
387 debug!("fail to parse bookie info for bookie id {}: {}", bookie_id, err);
388 continue;
389 },
390 Ok(bookie) => bookie,
391 };
392 return Ok(BookieUpdate::Add(bookie));
393 }
394 }
395}
396
397struct ZkMetaClientGuard<'a> {
398 client: MaybeUninit<ZkMetaClient>,
399 store: &'a ZkMetaStore,
400}
401
402impl std::ops::Deref for ZkMetaClientGuard<'_> {
403 type Target = ZkMetaClient;
404
405 fn deref(&self) -> &ZkMetaClient {
406 unsafe { &*self.client.as_ptr() }
407 }
408}
409
410impl std::ops::DerefMut for ZkMetaClientGuard<'_> {
411 fn deref_mut(&mut self) -> &mut ZkMetaClient {
412 unsafe { &mut *self.client.as_mut_ptr() }
413 }
414}
415
416impl Drop for ZkMetaClientGuard<'_> {
417 fn drop(&mut self) {
418 let client = unsafe { self.client.as_ptr().read() };
419 self.store.release_client(client);
420 }
421}
422
423#[derive(Copy, Clone, PartialEq, Eq, strum::Display)]
424enum LedgerIdFormat {
425 #[strum(serialize = "flat")]
426 Flat,
427 #[strum(serialize = "hierarchical")]
428 Hierarchical,
429 #[strum(serialize = "longhierarchical")]
430 LongHierarchical,
431}
432
433impl LedgerIdFormat {
434 fn try_from_ledger_layout(data: Vec<u8>) -> Result<LedgerIdFormat, BkError> {
435 let Ok(s) = String::from_utf8(data) else {
436 return Err(BkError::with_description(ErrorKind::MetaInvalidData, &"ledger layout is not utf8"));
437 };
438 let mut lines = s.split('\n');
439 let Ok(format_version) = lines.next().unwrap().parse::<i32>() else {
440 return Err(BkError::with_description(ErrorKind::MetaInvalidData, &"invalid ledger layout format version"));
441 };
442 if format_version != 1 && format_version != 2 {
443 let msg = format!("unsupported ledger layout format version {}", format_version);
444 return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
445 }
446 let Some(manager_line) = lines.next() else {
447 return Err(BkError::with_description(ErrorKind::MetaInvalidData, &"no ledger manager in ledger layout"));
448 };
449 let mut manager_splits = manager_line.split(':');
450 let manager_class = manager_splits.next().unwrap();
451 let Some(manager_version) = manager_splits.next() else {
452 return Err(BkError::with_description(
453 ErrorKind::MetaInvalidData,
454 &"no ledger manager version in ledger layout",
455 ));
456 };
457 if manager_version.parse::<i32>().is_err() {
458 return Err(BkError::with_description(
459 ErrorKind::MetaInvalidData,
460 &"invalid ledger manager version in ledger layout",
461 ));
462 };
463 if format_version == 1 {
464 if manager_class == "flat" {
465 return Ok(LedgerIdFormat::Flat);
466 } else if manager_class == "hierarchical" {
467 return Ok(LedgerIdFormat::Hierarchical);
468 }
469 let msg =
470 format!("unknown ledger manager type {} in ledger layout version {}", manager_class, format_version);
471 return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
472 }
473 if manager_class == "org.apache.bookkeeper.meta.FlatLedgerManagerFactory" {
474 return Ok(LedgerIdFormat::Flat);
475 } else if manager_class == "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory" {
476 return Ok(LedgerIdFormat::Hierarchical);
477 } else if manager_class == "org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory" {
478 return Ok(LedgerIdFormat::LongHierarchical);
479 }
480 let msg = format!("unknown ledger manager class {} in ledger layout version {}", manager_class, format_version);
481 Err(BkError::with_message(ErrorKind::MetaInvalidData, msg))
482 }
483}
484
485#[derive(Clone)]
486struct ZkMetaClient {
487 client: Arc<ZkClient>,
488 scratch: String,
489 ledger_root: CompactString,
490 ledger_id_format: LedgerIdFormat,
491 ledger_hob: i32,
492 ledger_path: String,
493 manager: Arc<ZkClientManager>,
494}
495
496impl ZkMetaClient {
497 fn new(
498 client: Arc<ZkClient>,
499 manager: Arc<ZkClientManager>,
500 ledger_id_format: LedgerIdFormat,
501 ledger_hob: i32,
502 ) -> ZkMetaClient {
503 ZkMetaClient {
504 client,
505 scratch: String::with_capacity(50),
506 ledger_root: Default::default(),
507 ledger_id_format,
508 ledger_hob,
509 ledger_path: String::with_capacity(50),
510 manager,
511 }
512 }
513
514 fn terminated(&self) -> bool {
515 self.client.terminated()
516 }
517
518 async fn check_expiration(&mut self) -> Result<(), BkError> {
519 if self.terminated() {
520 self.client = self.manager.replace_client(&self.client).await?;
521 }
522 Ok(())
523 }
524
525 fn format_flat_ledger_path(&mut self, ledger_id: LedgerId) {
526 write!(&mut self.ledger_path, "{}/L{:010}", &self.ledger_root, ledger_id).unwrap();
527 }
528
529 fn format_hierarchical_ledger_path(&mut self, ledger_id: LedgerId) {
530 if ledger_id.0 < i32::MAX as i64 {
531 self.format_short_hierarchical_ledger_path(ledger_id);
532 } else {
533 self.format_long_hierarchical_ledger_path(ledger_id);
534 }
535 }
536
537 fn format_short_hierarchical_ledger_path(&mut self, ledger_id: LedgerId) {
538 self.scratch.clear();
539 write!(&mut self.scratch, "{:010}", ledger_id).unwrap();
540 write!(
541 &mut self.ledger_path,
542 "{}/{}/{}/{}",
543 &self.ledger_root,
544 &self.scratch[..2],
545 &self.scratch[2..6],
546 &self.scratch[6..10]
547 )
548 .unwrap();
549 }
550
551 fn format_long_hierarchical_ledger_path(&mut self, ledger_id: LedgerId) {
552 write!(&mut self.scratch, "{:019}", ledger_id).unwrap();
553 write!(
554 &mut self.ledger_path,
555 "{}/{}/{}/{}/{}/L{}",
556 &self.ledger_root,
557 &self.scratch[..3],
558 &self.scratch[3..7],
559 &self.scratch[7..11],
560 &self.scratch[11..15],
561 &self.scratch[15..19]
562 )
563 .unwrap();
564 }
565
566 fn format_ledger_path(&mut self, ledger_id: LedgerId) {
567 self.ledger_path.clear();
568 match self.ledger_id_format {
569 LedgerIdFormat::Flat => self.format_flat_ledger_path(ledger_id),
570 LedgerIdFormat::Hierarchical => self.format_hierarchical_ledger_path(ledger_id),
571 LedgerIdFormat::LongHierarchical => self.format_long_hierarchical_ledger_path(ledger_id),
572 };
573 }
574
575 async fn generate_flat_ledger_id(&mut self) -> Result<LedgerId, BkError> {
576 let options = zk::CreateMode::EphemeralSequential.with_acls(zk::Acls::anyone_all());
577 let (_, sequence) = self.client.create("/ID-", Default::default(), &options).await?;
578 if sequence.into_i64() < 0 {
579 return Err(BkError::new(ErrorKind::LedgerIdOverflow));
580 }
581 Ok(LedgerId(sequence.into_i64()))
582 }
583
584 async fn generate_short_ledger_id(&self) -> Result<i64, BkError> {
585 let (_, sequence) =
586 self.create_path_optimistic("/idgen/ID-", Default::default(), zk::CreateMode::EphemeralSequential).await?;
587 Ok(sequence.into_i64())
588 }
589
590 async fn create_ledger_hob_directory(&mut self, i: i32) -> Result<(), zk::Error> {
591 write!(&mut self.scratch, "/idgen-long/{:010}", i).ignore();
592 self.create_directory_optimistic(&self.scratch).await
593 }
594
595 async fn generate_ledger_lob_id(&mut self, hob: i32) -> Result<i32, zk::Error> {
596 write!(&mut self.scratch, "/idgen-long/{:010}/ID-", hob).ignore();
597 let (_, sequence) =
598 self.create_path_optimistic(&self.scratch, Default::default(), zk::CreateMode::EphemeralSequential).await?;
599 Ok(sequence.into_i64() as i32)
600 }
601
602 fn extract_max_hob(children: &[String]) -> Result<i32, BkError> {
603 let mut max_hob = i32::MIN;
604 for child in children.iter() {
605 let hob_path = match child.strip_prefix("HOB-") {
606 None => {
607 let msg = format!("invalid ledger generation path {}", child);
608 return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
609 },
610 Some(path) => path,
611 };
612 let hob = match hob_path.parse::<i32>() {
613 Err(_) => {
614 let msg = format!("invalid ledger generation path {}", child);
615 return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
616 },
617 Ok(n) => n,
618 };
619 max_hob = max_hob.max(hob);
620 }
621 if max_hob <= 0 {
622 let msg = format!("no valid ledger generation path, last one is {}", children.last().unwrap());
623 return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
624 }
625 Ok(max_hob)
626 }
627
628 async fn get_max_hob(client: &ZkClient) -> Result<i32, BkError> {
629 let children = match client.list_children("/idgen-long").await {
630 Err(zk::Error::NoNode) => return Ok(-1),
631 Err(err) => return Err(From::from(err)),
632 Ok(children) => children,
633 };
634 Self::extract_max_hob(&children)
635 }
636
637 async fn generate_ledger_hob(&mut self) -> Result<(), BkError> {
638 let mut children = self.client.list_children("/idgen-long").await?;
639 if children.is_empty() {
640 self.create_ledger_hob_directory(1).await?;
641 children = self.client.list_children("/idgen-long").await?;
642 }
643 if children.is_empty() {
644 return Err(BkError::new(ErrorKind::MetaConcurrentOperation));
645 }
646 self.ledger_hob = Self::extract_max_hob(&children)?;
647 Ok(())
648 }
649
650 async fn generate_long_ledger_id(&mut self) -> Result<LedgerId, BkError> {
651 loop {
652 if self.ledger_hob < 0 {
653 self.generate_ledger_hob().await?;
654 }
655 let hob = self.ledger_hob;
656 let lob = self.generate_ledger_lob_id(hob).await?;
657 if (0..i32::MAX).contains(&lob) {
658 let id = (hob as i64) << 32 | (lob as i64);
659 return Ok(LedgerId(id));
660 }
661 self.ledger_hob = -1;
662 }
663 }
664
665 async fn generate_ledger_id(&mut self) -> Result<LedgerId, BkError> {
666 if self.ledger_id_format == LedgerIdFormat::Flat {
667 return self.generate_flat_ledger_id().await;
668 }
669 if self.ledger_hob <= 0 {
670 let sequence = self.generate_short_ledger_id().await?;
671 if sequence >= 0 && sequence < i32::MAX as i64 {
672 return Ok(LedgerId(sequence));
673 }
674 self.create_directory_optimistic("/idgen-long").await?;
675 self.create_ledger_hob_directory(1).await?;
676 self.ledger_hob = 1;
677 }
678 self.generate_long_ledger_id().await
679 }
680
681 async fn create_ledger_metadata(&mut self, metadata: &LedgerMetadata) -> Result<MetaVersion, BkError> {
682 let data = serde::serialize_ledger_metadata(metadata)?;
683 self.format_ledger_path(metadata.ledger_id);
684 let (stat, _) = self.create_path_optimistic(&self.ledger_path, &data, zk::CreateMode::Persistent).await?;
685 Ok(MetaVersion::from(stat))
686 }
687
688 #[async_recursion::async_recursion]
689 async fn create_directory_optimistic(&self, path: &str) -> Result<(), zk::Error> {
690 let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());
691 loop {
692 let result = self.client.create(path, Default::default(), &options).await;
693 match result {
694 Err(zk::Error::NoNode) => {
695 let last_slash_index = path.rfind('/').unwrap();
696 let parent = &path[..last_slash_index];
697 self.create_directory_optimistic(parent).await?;
698 },
699 Err(zk::Error::NodeExists) => {
700 return Ok(());
701 },
702 Err(err) => return Err(err),
703 Ok(_) => return Ok(()),
704 }
705 }
706 }
707
708 async fn create_path_optimistic(
709 &self,
710 path: &str,
711 data: &[u8],
712 mode: zk::CreateMode,
713 ) -> Result<(zk::Stat, zk::CreateSequence), zk::Error> {
714 let options = mode.with_acls(zk::Acls::anyone_all());
715 loop {
716 let result = self.client.create(path, data, &options).await;
717 match result {
718 Err(zk::Error::NoNode) => {
719 let last_slash_index = path.rfind('/').unwrap();
720 let parent = &path[..last_slash_index];
721 self.create_directory_optimistic(parent).await?;
722 },
723 Err(err) => return Err(err),
724 Ok(result) => {
725 return Ok(result);
726 },
727 };
728 }
729 }
730
731 async fn read_metadata(
732 &self,
733 ledger_id: LedgerId,
734 ledger_path: &str,
735 ) -> Result<Versioned<LedgerMetadata>, BkError> {
736 let (data, stat) = match self.client.get_data(ledger_path).await {
737 Err(zk::Error::NoNode) => return Err(BkError::new(ErrorKind::LedgerNotExisted)),
738 Err(err) => return Err(From::from(err)),
739 Ok(result) => result,
740 };
741 let metadata = serde::deserialize_ledger_metadata(ledger_id, &data)?;
742 Ok(Versioned::new(stat, metadata))
743 }
744
745 async fn read_ledger_metadata(&mut self, ledger_id: LedgerId) -> Result<Versioned<LedgerMetadata>, BkError> {
746 self.format_ledger_path(ledger_id);
747 self.read_metadata(ledger_id, &self.ledger_path).await
748 }
749
750 async fn remove_ledger_metadata(
751 &mut self,
752 ledger_id: LedgerId,
753 expected_version: Option<MetaVersion>,
754 ) -> Result<(), BkError> {
755 self.format_ledger_path(ledger_id);
756 let xid = match expected_version {
757 None => return self.delete_ledger_path(&self.ledger_path, None).await,
758 Some(version) => version.0,
759 };
760 let stat = match self.client.check_stat(&self.ledger_path).await? {
761 None => return Err(BkError::new(ErrorKind::LedgerNotExisted)),
762 Some(stat) => stat,
763 };
764 if stat.mzxid != xid {
765 return Err(BkError::new(ErrorKind::MetaVersionMismatch));
766 }
767 self.delete_ledger_path(&self.ledger_path, Some(stat.version)).await
768 }
769
770 async fn watch_ledger_metadata(
771 &mut self,
772 ledger_id: LedgerId,
773 start_version: MetaVersion,
774 ) -> BkResult<Box<dyn LedgerMetadataStream>> {
775 self.format_ledger_path(ledger_id);
776 let (stat, watcher) = match self.client.check_and_watch_stat(&self.ledger_path).await? {
777 (None, _) => return Err(BkError::new(ErrorKind::LedgerNotExisted)),
778 (Some(stat), watcher) => (stat, watcher),
779 };
780 let metadata_stream = ZkLedgerMetadataStream {
781 watcher: Some(watcher),
782 version: start_version.into(),
783 updated: stat.mzxid,
784 ledger_id,
785 ledger_path: self.ledger_path.clone(),
786 client: self.client.clone(),
787 manager: self.manager.clone(),
788 };
789 Ok(Box::new(metadata_stream))
790 }
791
792 async fn write_ledger_metadata(
793 &mut self,
794 metadata: &LedgerMetadata,
795 expected_version: MetaVersion,
796 ) -> Result<Either<Versioned<LedgerMetadata>, MetaVersion>, BkError> {
797 let ledger_id = metadata.ledger_id;
798 let data = serde::serialize_ledger_metadata(metadata)?;
799 self.format_ledger_path(ledger_id);
800 let stat = match self.client.check_stat(&self.ledger_path).await? {
801 None => return Err(BkError::new(ErrorKind::LedgerNotExisted)),
802 Some(stat) => stat,
803 };
804 match i64::from(expected_version).cmp(&stat.mzxid) {
805 Ordering::Less => {
806 let metadata = self.read_metadata(ledger_id, &self.ledger_path).await?;
807 return Ok(either::Left(metadata));
808 },
809 Ordering::Greater => {
810 return Err(BkError::with_description(ErrorKind::MetaClientError, &"zk client is not in sync"));
811 },
812 _ => {},
813 }
814
815 let result = self.client.set_data(&self.ledger_path, &data, Some(stat.version)).await;
816 match result {
817 Err(zk::Error::NoNode) => {
818 let err = BkError::new(ErrorKind::LedgerNotExisted);
819 return Err(err);
820 },
821 Err(zk::Error::BadVersion) => {},
822 Err(e) => {
823 let err = BkError::new(ErrorKind::MetaClientError).cause_by(e);
824 return Err(err);
825 },
826 Ok(stat) => {
827 let version = MetaVersion::from(stat);
828 return Ok(either::Right(version));
829 },
830 };
831 let metadata = self.read_metadata(ledger_id, &self.ledger_path).await?;
832 Ok(either::Left(metadata))
833 }
834
835 async fn delete_ledger_path(&self, path: &str, version: Option<i32>) -> BkResult<()> {
836 if let Err(err) = self.client.delete(path, version).await {
837 if err == zk::Error::NoNode {
838 return Err(BkError::new(ErrorKind::LedgerNotExisted));
839 }
840 return Err(From::from(err));
841 };
842 Ok(())
843 }
844
845 async fn watch_bookies(
846 &mut self,
847 path: &'static str,
848 exclude: &'static str,
849 ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
850 let (bookies, watcher) = ZkBookieUpdateStream::rebuild(&self.client, path, exclude, &mut self.scratch).await?;
851 let stream = ZkBookieUpdateStream {
852 expired: false,
853 client: self.client.clone(),
854 path,
855 exclude,
856 scratch: String::with_capacity(50),
857 watcher,
858 manager: self.manager.clone(),
859 };
860 Ok((bookies, Box::new(stream)))
861 }
862}
863
864#[derive(Clone)]
865pub struct ZkMetaStore {
866 clients: Arc<Mutex<VecDeque<ZkMetaClient>>>,
867}
868
869impl ZkMetaStore {
870 pub async fn new(mut config: ZkConfiguration) -> Result<ZkMetaStore, BkError> {
871 let configed_ledger_id_format = config.ledger_id_format.take();
872 let zookeeper = zk::Client::builder()
873 .with_session_timeout(config.timeout)
874 .connect(&config.connect)
875 .await?
876 .chroot(&config.root)
877 .unwrap();
878 let client = Arc::new(ZkClient { client: zookeeper });
879 let manager = Arc::new(ZkClientManager::new(config, client.clone()));
880 let layout_data = match client.get_data("/LAYOUT").await {
881 Err(zk::Error::NoNode) => return Err(BkError::new(ErrorKind::MetaClusterUninitialized)),
882 Err(err) => return Err(From::from(err)),
883 Ok((data, _)) => data,
884 };
885 let ledger_id_format = LedgerIdFormat::try_from_ledger_layout(layout_data)?;
886 if let Some(expected_ledger_id_format) = configed_ledger_id_format {
887 if ledger_id_format != expected_ledger_id_format {
888 let msg = format!(
889 "expect ledger id format {}, got {} from ZooKeeper",
890 expected_ledger_id_format, ledger_id_format
891 );
892 return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
893 }
894 }
895 let ledger_hob =
896 if ledger_id_format == LedgerIdFormat::Flat { -1 } else { ZkMetaClient::get_max_hob(&client).await? };
897 let meta_client = ZkMetaClient::new(client, manager, ledger_id_format, ledger_hob);
898 let meta_store =
899 ZkMetaStore { clients: Arc::new(Mutex::new(VecDeque::from([meta_client.clone(), meta_client]))) };
900 Ok(meta_store)
901 }
902
903 fn get_client(&self) -> ZkMetaClientGuard<'_> {
904 let mut clients = self.clients.lock().unwrap();
905 let client = if clients.len() == 1 { clients[0].clone() } else { clients.pop_front().unwrap() };
906 drop(clients);
907 ZkMetaClientGuard { client: MaybeUninit::new(client), store: self }
908 }
909
910 async fn get_connected_client(&self) -> Result<ZkMetaClientGuard<'_>, BkError> {
911 let mut client = self.get_client();
912 client.check_expiration().await?;
913 Ok(client)
914 }
915
916 fn release_client(&self, client: ZkMetaClient) {
917 if client.terminated() {
918 return;
919 }
920 let mut clients = self.clients.lock().unwrap();
921 clients.push_back(client);
922 }
923}
924
925#[async_trait]
926impl LedgerMetadataStoreClient for ZkMetaStore {
927 async fn create_ledger_metadata(&self, metadata: &LedgerMetadata) -> Result<MetaVersion, BkError> {
928 let mut client = self.get_connected_client().await?;
929 return client.create_ledger_metadata(metadata).await;
930 }
931
932 async fn remove_ledger_metadata(
933 &self,
934 ledger_id: LedgerId,
935 expected_version: Option<MetaVersion>,
936 ) -> Result<(), BkError> {
937 let mut client = self.get_connected_client().await?;
938 return client.remove_ledger_metadata(ledger_id, expected_version).await;
939 }
940
941 async fn read_ledger_metadata(&self, ledger_id: LedgerId) -> Result<Versioned<LedgerMetadata>, BkError> {
942 let mut client = self.get_connected_client().await?;
943 return client.read_ledger_metadata(ledger_id).await;
944 }
945
946 async fn watch_ledger_metadata(
947 &self,
948 ledger_id: LedgerId,
949 start_version: MetaVersion,
950 ) -> BkResult<Box<dyn LedgerMetadataStream>> {
951 let mut client = self.get_connected_client().await?;
952 return client.watch_ledger_metadata(ledger_id, start_version).await;
953 }
954
955 async fn write_ledger_metadata(
956 &self,
957 metadata: &LedgerMetadata,
958 expected_version: MetaVersion,
959 ) -> Result<Either<Versioned<LedgerMetadata>, MetaVersion>, BkError> {
960 let mut client = self.get_connected_client().await?;
961 return client.write_ledger_metadata(metadata, expected_version).await;
962 }
963}
964
965impl MetaStore for ZkMetaStore {}