bookkeeper_client/meta/
zookeeper.rs

1use std::cmp::Ordering;
2use std::collections::VecDeque;
3use std::fmt::Write as _;
4use std::mem::MaybeUninit;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use async_trait::async_trait;
9use compact_str::CompactString;
10use either::Either;
11use ignore_result::Ignore;
12use log::debug;
13use uuid::Uuid;
14use zookeeper_client as zk;
15
16use super::serde;
17use super::types::{
18    self,
19    BookieRegistrationClient,
20    BookieServiceInfo,
21    BookieUpdate,
22    BookieUpdateStream,
23    LedgerIdStoreClient,
24    LedgerMetadataStoreClient,
25    LedgerMetadataStream,
26    MetaStore,
27    MetaVersion,
28    Versioned,
29};
30use crate::client::errors::{BkError, BkResult, ErrorKind};
31use crate::client::service_uri::ServiceUri;
32use crate::client::{BookieId, LedgerId, LedgerMetadata};
33
34#[derive(Clone)]
35pub struct ZkConfiguration {
36    ledger_id_format: Option<LedgerIdFormat>,
37    root: CompactString,
38    connect: CompactString,
39    timeout: Duration,
40}
41
42impl ZkConfiguration {
43    pub fn from_service_uri(uri: ServiceUri) -> Result<ZkConfiguration, BkError> {
44        assert_eq!(uri.scheme, "zk");
45        let ServiceUri { spec, address, path, .. } = uri;
46        let ledger_id_format = match spec.as_str() {
47            "" | "null" => None,
48            "flat" => Some(LedgerIdFormat::Flat),
49            "hierarchical" => Some(LedgerIdFormat::Hierarchical),
50            "longhierarchical" => Some(LedgerIdFormat::LongHierarchical),
51            _ => {
52                return Err(BkError::with_message(
53                    ErrorKind::InvalidServiceUri,
54                    format!("unknown ledger id format {}", spec),
55                ))
56            },
57        };
58        let config =
59            ZkConfiguration { ledger_id_format, root: path, connect: address, timeout: Duration::from_secs(10) };
60        Ok(config)
61    }
62}
63
64enum Error {
65    SessionExpired,
66}
67
68impl From<Error> for BkError {
69    fn from(err: Error) -> BkError {
70        use Error::*;
71        match err {
72            SessionExpired => BkError::with_description(ErrorKind::MetaClientError, &"zk session expired"),
73        }
74    }
75}
76
77struct ZkClient {
78    client: zk::Client,
79}
80
81impl ZkClient {
82    fn terminated(&self) -> bool {
83        let state = self.client.state();
84        state.is_terminated()
85    }
86
87    async fn connected(&self) -> Result<(), Error> {
88        let mut watcher = self.client.state_watcher();
89        let mut state = watcher.state();
90        loop {
91            match state {
92                zk::SessionState::SyncConnected => return Ok(()),
93                zk::SessionState::Disconnected => {},
94                _ => return Err(Error::SessionExpired),
95            };
96            state = watcher.changed().await;
97        }
98    }
99}
100
101impl std::ops::Deref for ZkClient {
102    type Target = zk::Client;
103
104    fn deref(&self) -> &zk::Client {
105        &self.client
106    }
107}
108
109impl From<zk::Stat> for MetaVersion {
110    fn from(stat: zk::Stat) -> MetaVersion {
111        MetaVersion(stat.mzxid)
112    }
113}
114
115#[async_trait]
116impl BookieRegistrationClient for ZkMetaStore {
117    async fn watch_readable_bookies(
118        &mut self,
119    ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
120        let mut client = self.get_connected_client().await?;
121        client.watch_bookies("/available/readonly", "").await
122    }
123
124    async fn watch_writable_bookies(
125        &mut self,
126    ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
127        let mut client = self.get_connected_client().await?;
128        client.watch_bookies("/available", "readonly").await
129    }
130}
131
132#[async_trait]
133impl LedgerIdStoreClient for ZkMetaStore {
134    async fn generate_ledger_id(&self) -> Result<LedgerId, BkError> {
135        let mut client = self.get_connected_client().await?;
136        client.generate_ledger_id().await
137    }
138}
139
140impl From<zk::Error> for BkError {
141    fn from(err: zk::Error) -> BkError {
142        let kind = match err {
143            zk::Error::BadVersion => ErrorKind::MetaVersionMismatch,
144            _ => ErrorKind::MetaClientError,
145        };
146        BkError::new(kind).cause_by(err)
147    }
148}
149
150struct ZkSessionClient {
151    client: Arc<ZkClient>,
152    counter: usize,
153    session: String,
154}
155
156struct ZkClientManager {
157    config: ZkConfiguration,
158    client: tokio::sync::Mutex<ZkSessionClient>,
159}
160
161impl ZkClientManager {
162    fn new(config: ZkConfiguration, client: Arc<ZkClient>) -> ZkClientManager {
163        let client = ZkSessionClient { client, counter: 0, session: Uuid::new_v4().to_string() };
164        ZkClientManager { config, client: tokio::sync::Mutex::new(client) }
165    }
166
167    async fn replace_client(&self, client: &Arc<ZkClient>) -> Result<Arc<ZkClient>, BkError> {
168        let mut guard = self.client.lock().await;
169        if !Arc::ptr_eq(client, &guard.client) {
170            return Ok(guard.client.clone());
171        }
172        let zookeeper =
173            zk::Client::builder().with_session_timeout(self.config.timeout).connect(&self.config.connect).await?;
174        let client = Arc::new(ZkClient { client: zookeeper });
175        guard.session.truncate(36);
176        guard.counter += 1;
177        let counter = guard.counter;
178        write!(&mut guard.session, "-{}", counter).ignore();
179        let options = zk::CreateMode::Ephemeral.with_acls(zk::Acls::creator_all());
180        // A quorum write is necessary to ensure cross-client linearizable.
181        client.create(&guard.session, Default::default(), &options).await?;
182        guard.client = client.clone();
183        Ok(client)
184    }
185}
186
187struct ZkLedgerMetadataStream {
188    watcher: Option<zk::OneshotWatcher>,
189    version: i64,
190    updated: i64,
191    ledger_id: LedgerId,
192    ledger_path: String,
193    client: Arc<ZkClient>,
194    manager: Arc<ZkClientManager>,
195}
196
197impl ZkLedgerMetadataStream {
198    async fn new_client(&mut self) -> Result<(), BkError> {
199        let client = self.manager.replace_client(&self.client).await?;
200        self.client = client;
201        Ok(())
202    }
203
204    async fn connected(&mut self) -> Result<(), BkError> {
205        match self.client.connected().await {
206            Ok(_) => Ok(()),
207            Err(Error::SessionExpired) => self.new_client().await,
208        }
209    }
210
211    async fn watch_exists(&mut self) -> Result<(Option<zk::Stat>, zk::OneshotWatcher), zk::Error> {
212        self.client.check_and_watch_stat(&self.ledger_path).await
213    }
214}
215
216#[async_trait]
217impl LedgerMetadataStream for ZkLedgerMetadataStream {
218    async fn cancel(&mut self) {}
219
220    async fn next(&mut self) -> Result<Versioned<LedgerMetadata>, BkError> {
221        loop {
222            if self.updated > self.version {
223                match self.client.get_data(&self.ledger_path).await {
224                    Ok((data, stat)) if stat.mzxid > self.version => {
225                        self.version = stat.mzxid;
226                        let metadata = serde::deserialize_ledger_metadata(self.ledger_id, &data)?;
227                        return Ok(Versioned::new(stat, metadata));
228                    },
229                    Err(zk::Error::NoNode) => {
230                        self.version = self.updated;
231                        return Err(BkError::new(ErrorKind::LedgerNotExisted));
232                    },
233                    _ => {},
234                };
235            }
236            if let Some(watcher) = self.watcher.take() {
237                watcher.changed().await;
238            }
239            match self.watch_exists().await {
240                Ok(r) => {
241                    let (stat, watcher) = r;
242                    self.watcher = Some(watcher);
243                    if let Some(stat) = stat {
244                        self.updated = self.updated.max(stat.mzxid);
245                        continue;
246                    }
247                    return Err(BkError::new(ErrorKind::LedgerNotExisted));
248                },
249                Err(zk::Error::ConnectionLoss) => {
250                    self.connected().await.ignore();
251                },
252                Err(zk::Error::SessionExpired) => {
253                    self.new_client().await?;
254                },
255                Err(e) => {
256                    return Err(BkError::new(ErrorKind::MetaClientError).cause_by(e));
257                },
258            };
259        }
260    }
261}
262
263struct ZkBookieUpdateStream {
264    path: &'static str,
265    exclude: &'static str,
266    scratch: String,
267    client: Arc<ZkClient>,
268    manager: Arc<ZkClientManager>,
269    watcher: zk::PersistentWatcher,
270    expired: bool,
271}
272
273impl ZkBookieUpdateStream {
274    async fn new_client(&mut self) -> Result<(), BkError> {
275        let client = self.manager.replace_client(&self.client).await?;
276        self.client = client;
277        Ok(())
278    }
279
280    async fn connected(&mut self) -> Result<(), BkError> {
281        match self.client.connected().await {
282            Ok(_) => Ok(()),
283            Err(_) => self.new_client().await,
284        }
285    }
286
287    async fn get_bookies(
288        client: &ZkClient,
289        path: &str,
290        children: &[String],
291        exclude: &str,
292        scratch: &mut String,
293    ) -> Result<Vec<BookieServiceInfo>, BkError> {
294        let mut bookies = Vec::with_capacity(children.len());
295        for bookie_path in children.iter() {
296            if !exclude.is_empty() && bookie_path.starts_with(exclude) {
297                continue;
298            }
299            scratch.clear();
300            write!(scratch, "{}/{}", path, &bookie_path).ignore();
301            let data = match client.get_data(scratch).await {
302                Err(zk::Error::NoNode) => continue,
303                Err(err) => return Err(From::from(err)),
304                Ok((data, _)) => data,
305            };
306            let bookie_id = BookieId::new(bookie_path);
307            let bookie_result = if data.is_empty() {
308                BookieServiceInfo::from_legacy(bookie_id)
309            } else {
310                BookieServiceInfo::from_protobuf(bookie_id, &data)
311            };
312            match bookie_result {
313                Err(_) => continue,
314                Ok(bookie) => bookies.push(bookie),
315            }
316        }
317        Ok(bookies)
318    }
319
320    async fn rebuild(
321        client: &ZkClient,
322        path: &str,
323        exclude: &str,
324        scratch: &mut String,
325    ) -> Result<(Vec<BookieServiceInfo>, zk::PersistentWatcher), BkError> {
326        let watcher = client.watch(path, zk::AddWatchMode::PersistentRecursive).await?;
327        let children = match client.list_children(path).await {
328            Err(zk::Error::NoNode) => Default::default(),
329            Err(err) => return Err(From::from(err)),
330            Ok(children) => children,
331        };
332        let bookies = Self::get_bookies(client, path, &children, exclude, scratch).await?;
333        Ok((bookies, watcher))
334    }
335
336    async fn on_session_event(&mut self, state: zk::SessionState) -> Result<(), BkError> {
337        if state == zk::SessionState::Disconnected {
338            self.connected().await?;
339        } else if state.is_terminated() {
340            self.new_client().await?;
341            self.expired = true;
342        }
343        Ok(())
344    }
345}
346
347#[async_trait]
348impl BookieUpdateStream for ZkBookieUpdateStream {
349    async fn next(&mut self) -> BkResult<BookieUpdate> {
350        loop {
351            if self.expired {
352                let (bookies, watcher) =
353                    Self::rebuild(&self.client, self.path, self.exclude, &mut self.scratch).await?;
354                self.watcher = watcher;
355                self.expired = false;
356                return Ok(BookieUpdate::Reconstruction(bookies));
357            }
358            let event = self.watcher.changed().await;
359            if event.event_type == zk::EventType::Session {
360                self.on_session_event(event.session_state).await?;
361                continue;
362            } else if event.event_type != zk::EventType::NodeChildrenChanged {
363                continue;
364            }
365            let bookie_path = match event.path.strip_prefix(self.path).and_then(|path| path.strip_prefix('/')) {
366                None => continue,
367                Some(bookie_path) => bookie_path,
368            };
369            if bookie_path == self.exclude {
370                continue;
371            }
372            let bookie_id = BookieId::new(bookie_path);
373            if event.event_type == zk::EventType::NodeDeleted {
374                return Ok(BookieUpdate::Remove(bookie_id));
375            }
376            let (data, _) = match self.client.get_data(&event.path).await {
377                Err(_) => continue,
378                Ok(result) => result,
379            };
380            let bookie_result = if data.is_empty() {
381                BookieServiceInfo::from_legacy(bookie_id.clone())
382            } else {
383                BookieServiceInfo::from_protobuf(bookie_id.clone(), &data)
384            };
385            let bookie = match bookie_result {
386                Err(err) => {
387                    debug!("fail to parse bookie info for bookie id {}: {}", bookie_id, err);
388                    continue;
389                },
390                Ok(bookie) => bookie,
391            };
392            return Ok(BookieUpdate::Add(bookie));
393        }
394    }
395}
396
397struct ZkMetaClientGuard<'a> {
398    client: MaybeUninit<ZkMetaClient>,
399    store: &'a ZkMetaStore,
400}
401
402impl std::ops::Deref for ZkMetaClientGuard<'_> {
403    type Target = ZkMetaClient;
404
405    fn deref(&self) -> &ZkMetaClient {
406        unsafe { &*self.client.as_ptr() }
407    }
408}
409
410impl std::ops::DerefMut for ZkMetaClientGuard<'_> {
411    fn deref_mut(&mut self) -> &mut ZkMetaClient {
412        unsafe { &mut *self.client.as_mut_ptr() }
413    }
414}
415
416impl Drop for ZkMetaClientGuard<'_> {
417    fn drop(&mut self) {
418        let client = unsafe { self.client.as_ptr().read() };
419        self.store.release_client(client);
420    }
421}
422
423#[derive(Copy, Clone, PartialEq, Eq, strum::Display)]
424enum LedgerIdFormat {
425    #[strum(serialize = "flat")]
426    Flat,
427    #[strum(serialize = "hierarchical")]
428    Hierarchical,
429    #[strum(serialize = "longhierarchical")]
430    LongHierarchical,
431}
432
433impl LedgerIdFormat {
434    fn try_from_ledger_layout(data: Vec<u8>) -> Result<LedgerIdFormat, BkError> {
435        let Ok(s) = String::from_utf8(data) else {
436            return Err(BkError::with_description(ErrorKind::MetaInvalidData, &"ledger layout is not utf8"));
437        };
438        let mut lines = s.split('\n');
439        let Ok(format_version) = lines.next().unwrap().parse::<i32>() else {
440            return Err(BkError::with_description(ErrorKind::MetaInvalidData, &"invalid ledger layout format version"));
441        };
442        if format_version != 1 && format_version != 2 {
443            let msg = format!("unsupported ledger layout format version {}", format_version);
444            return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
445        }
446        let Some(manager_line) = lines.next() else {
447            return Err(BkError::with_description(ErrorKind::MetaInvalidData, &"no ledger manager in ledger layout"));
448        };
449        let mut manager_splits = manager_line.split(':');
450        let manager_class = manager_splits.next().unwrap();
451        let Some(manager_version) = manager_splits.next() else {
452            return Err(BkError::with_description(
453                ErrorKind::MetaInvalidData,
454                &"no ledger manager version in ledger layout",
455            ));
456        };
457        if manager_version.parse::<i32>().is_err() {
458            return Err(BkError::with_description(
459                ErrorKind::MetaInvalidData,
460                &"invalid ledger manager version in ledger layout",
461            ));
462        };
463        if format_version == 1 {
464            if manager_class == "flat" {
465                return Ok(LedgerIdFormat::Flat);
466            } else if manager_class == "hierarchical" {
467                return Ok(LedgerIdFormat::Hierarchical);
468            }
469            let msg =
470                format!("unknown ledger manager type {} in ledger layout version {}", manager_class, format_version);
471            return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
472        }
473        if manager_class == "org.apache.bookkeeper.meta.FlatLedgerManagerFactory" {
474            return Ok(LedgerIdFormat::Flat);
475        } else if manager_class == "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory" {
476            return Ok(LedgerIdFormat::Hierarchical);
477        } else if manager_class == "org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory" {
478            return Ok(LedgerIdFormat::LongHierarchical);
479        }
480        let msg = format!("unknown ledger manager class {} in ledger layout version {}", manager_class, format_version);
481        Err(BkError::with_message(ErrorKind::MetaInvalidData, msg))
482    }
483}
484
485#[derive(Clone)]
486struct ZkMetaClient {
487    client: Arc<ZkClient>,
488    scratch: String,
489    ledger_root: CompactString,
490    ledger_id_format: LedgerIdFormat,
491    ledger_hob: i32,
492    ledger_path: String,
493    manager: Arc<ZkClientManager>,
494}
495
496impl ZkMetaClient {
497    fn new(
498        client: Arc<ZkClient>,
499        manager: Arc<ZkClientManager>,
500        ledger_id_format: LedgerIdFormat,
501        ledger_hob: i32,
502    ) -> ZkMetaClient {
503        ZkMetaClient {
504            client,
505            scratch: String::with_capacity(50),
506            ledger_root: Default::default(),
507            ledger_id_format,
508            ledger_hob,
509            ledger_path: String::with_capacity(50),
510            manager,
511        }
512    }
513
514    fn terminated(&self) -> bool {
515        self.client.terminated()
516    }
517
518    async fn check_expiration(&mut self) -> Result<(), BkError> {
519        if self.terminated() {
520            self.client = self.manager.replace_client(&self.client).await?;
521        }
522        Ok(())
523    }
524
525    fn format_flat_ledger_path(&mut self, ledger_id: LedgerId) {
526        write!(&mut self.ledger_path, "{}/L{:010}", &self.ledger_root, ledger_id).unwrap();
527    }
528
529    fn format_hierarchical_ledger_path(&mut self, ledger_id: LedgerId) {
530        if ledger_id.0 < i32::MAX as i64 {
531            self.format_short_hierarchical_ledger_path(ledger_id);
532        } else {
533            self.format_long_hierarchical_ledger_path(ledger_id);
534        }
535    }
536
537    fn format_short_hierarchical_ledger_path(&mut self, ledger_id: LedgerId) {
538        self.scratch.clear();
539        write!(&mut self.scratch, "{:010}", ledger_id).unwrap();
540        write!(
541            &mut self.ledger_path,
542            "{}/{}/{}/{}",
543            &self.ledger_root,
544            &self.scratch[..2],
545            &self.scratch[2..6],
546            &self.scratch[6..10]
547        )
548        .unwrap();
549    }
550
551    fn format_long_hierarchical_ledger_path(&mut self, ledger_id: LedgerId) {
552        write!(&mut self.scratch, "{:019}", ledger_id).unwrap();
553        write!(
554            &mut self.ledger_path,
555            "{}/{}/{}/{}/{}/L{}",
556            &self.ledger_root,
557            &self.scratch[..3],
558            &self.scratch[3..7],
559            &self.scratch[7..11],
560            &self.scratch[11..15],
561            &self.scratch[15..19]
562        )
563        .unwrap();
564    }
565
566    fn format_ledger_path(&mut self, ledger_id: LedgerId) {
567        self.ledger_path.clear();
568        match self.ledger_id_format {
569            LedgerIdFormat::Flat => self.format_flat_ledger_path(ledger_id),
570            LedgerIdFormat::Hierarchical => self.format_hierarchical_ledger_path(ledger_id),
571            LedgerIdFormat::LongHierarchical => self.format_long_hierarchical_ledger_path(ledger_id),
572        };
573    }
574
575    async fn generate_flat_ledger_id(&mut self) -> Result<LedgerId, BkError> {
576        let options = zk::CreateMode::EphemeralSequential.with_acls(zk::Acls::anyone_all());
577        let (_, sequence) = self.client.create("/ID-", Default::default(), &options).await?;
578        if sequence.into_i64() < 0 {
579            return Err(BkError::new(ErrorKind::LedgerIdOverflow));
580        }
581        Ok(LedgerId(sequence.into_i64()))
582    }
583
584    async fn generate_short_ledger_id(&self) -> Result<i64, BkError> {
585        let (_, sequence) =
586            self.create_path_optimistic("/idgen/ID-", Default::default(), zk::CreateMode::EphemeralSequential).await?;
587        Ok(sequence.into_i64())
588    }
589
590    async fn create_ledger_hob_directory(&mut self, i: i32) -> Result<(), zk::Error> {
591        write!(&mut self.scratch, "/idgen-long/{:010}", i).ignore();
592        self.create_directory_optimistic(&self.scratch).await
593    }
594
595    async fn generate_ledger_lob_id(&mut self, hob: i32) -> Result<i32, zk::Error> {
596        write!(&mut self.scratch, "/idgen-long/{:010}/ID-", hob).ignore();
597        let (_, sequence) =
598            self.create_path_optimistic(&self.scratch, Default::default(), zk::CreateMode::EphemeralSequential).await?;
599        Ok(sequence.into_i64() as i32)
600    }
601
602    fn extract_max_hob(children: &[String]) -> Result<i32, BkError> {
603        let mut max_hob = i32::MIN;
604        for child in children.iter() {
605            let hob_path = match child.strip_prefix("HOB-") {
606                None => {
607                    let msg = format!("invalid ledger generation path {}", child);
608                    return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
609                },
610                Some(path) => path,
611            };
612            let hob = match hob_path.parse::<i32>() {
613                Err(_) => {
614                    let msg = format!("invalid ledger generation path {}", child);
615                    return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
616                },
617                Ok(n) => n,
618            };
619            max_hob = max_hob.max(hob);
620        }
621        if max_hob <= 0 {
622            let msg = format!("no valid ledger generation path, last one is {}", children.last().unwrap());
623            return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
624        }
625        Ok(max_hob)
626    }
627
628    async fn get_max_hob(client: &ZkClient) -> Result<i32, BkError> {
629        let children = match client.list_children("/idgen-long").await {
630            Err(zk::Error::NoNode) => return Ok(-1),
631            Err(err) => return Err(From::from(err)),
632            Ok(children) => children,
633        };
634        Self::extract_max_hob(&children)
635    }
636
637    async fn generate_ledger_hob(&mut self) -> Result<(), BkError> {
638        let mut children = self.client.list_children("/idgen-long").await?;
639        if children.is_empty() {
640            self.create_ledger_hob_directory(1).await?;
641            children = self.client.list_children("/idgen-long").await?;
642        }
643        if children.is_empty() {
644            return Err(BkError::new(ErrorKind::MetaConcurrentOperation));
645        }
646        self.ledger_hob = Self::extract_max_hob(&children)?;
647        Ok(())
648    }
649
650    async fn generate_long_ledger_id(&mut self) -> Result<LedgerId, BkError> {
651        loop {
652            if self.ledger_hob < 0 {
653                self.generate_ledger_hob().await?;
654            }
655            let hob = self.ledger_hob;
656            let lob = self.generate_ledger_lob_id(hob).await?;
657            if (0..i32::MAX).contains(&lob) {
658                let id = (hob as i64) << 32 | (lob as i64);
659                return Ok(LedgerId(id));
660            }
661            self.ledger_hob = -1;
662        }
663    }
664
665    async fn generate_ledger_id(&mut self) -> Result<LedgerId, BkError> {
666        if self.ledger_id_format == LedgerIdFormat::Flat {
667            return self.generate_flat_ledger_id().await;
668        }
669        if self.ledger_hob <= 0 {
670            let sequence = self.generate_short_ledger_id().await?;
671            if sequence >= 0 && sequence < i32::MAX as i64 {
672                return Ok(LedgerId(sequence));
673            }
674            self.create_directory_optimistic("/idgen-long").await?;
675            self.create_ledger_hob_directory(1).await?;
676            self.ledger_hob = 1;
677        }
678        self.generate_long_ledger_id().await
679    }
680
681    async fn create_ledger_metadata(&mut self, metadata: &LedgerMetadata) -> Result<MetaVersion, BkError> {
682        let data = serde::serialize_ledger_metadata(metadata)?;
683        self.format_ledger_path(metadata.ledger_id);
684        let (stat, _) = self.create_path_optimistic(&self.ledger_path, &data, zk::CreateMode::Persistent).await?;
685        Ok(MetaVersion::from(stat))
686    }
687
688    #[async_recursion::async_recursion]
689    async fn create_directory_optimistic(&self, path: &str) -> Result<(), zk::Error> {
690        let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());
691        loop {
692            let result = self.client.create(path, Default::default(), &options).await;
693            match result {
694                Err(zk::Error::NoNode) => {
695                    let last_slash_index = path.rfind('/').unwrap();
696                    let parent = &path[..last_slash_index];
697                    self.create_directory_optimistic(parent).await?;
698                },
699                Err(zk::Error::NodeExists) => {
700                    return Ok(());
701                },
702                Err(err) => return Err(err),
703                Ok(_) => return Ok(()),
704            }
705        }
706    }
707
708    async fn create_path_optimistic(
709        &self,
710        path: &str,
711        data: &[u8],
712        mode: zk::CreateMode,
713    ) -> Result<(zk::Stat, zk::CreateSequence), zk::Error> {
714        let options = mode.with_acls(zk::Acls::anyone_all());
715        loop {
716            let result = self.client.create(path, data, &options).await;
717            match result {
718                Err(zk::Error::NoNode) => {
719                    let last_slash_index = path.rfind('/').unwrap();
720                    let parent = &path[..last_slash_index];
721                    self.create_directory_optimistic(parent).await?;
722                },
723                Err(err) => return Err(err),
724                Ok(result) => {
725                    return Ok(result);
726                },
727            };
728        }
729    }
730
731    async fn read_metadata(
732        &self,
733        ledger_id: LedgerId,
734        ledger_path: &str,
735    ) -> Result<Versioned<LedgerMetadata>, BkError> {
736        let (data, stat) = match self.client.get_data(ledger_path).await {
737            Err(zk::Error::NoNode) => return Err(BkError::new(ErrorKind::LedgerNotExisted)),
738            Err(err) => return Err(From::from(err)),
739            Ok(result) => result,
740        };
741        let metadata = serde::deserialize_ledger_metadata(ledger_id, &data)?;
742        Ok(Versioned::new(stat, metadata))
743    }
744
745    async fn read_ledger_metadata(&mut self, ledger_id: LedgerId) -> Result<Versioned<LedgerMetadata>, BkError> {
746        self.format_ledger_path(ledger_id);
747        self.read_metadata(ledger_id, &self.ledger_path).await
748    }
749
750    async fn remove_ledger_metadata(
751        &mut self,
752        ledger_id: LedgerId,
753        expected_version: Option<MetaVersion>,
754    ) -> Result<(), BkError> {
755        self.format_ledger_path(ledger_id);
756        let xid = match expected_version {
757            None => return self.delete_ledger_path(&self.ledger_path, None).await,
758            Some(version) => version.0,
759        };
760        let stat = match self.client.check_stat(&self.ledger_path).await? {
761            None => return Err(BkError::new(ErrorKind::LedgerNotExisted)),
762            Some(stat) => stat,
763        };
764        if stat.mzxid != xid {
765            return Err(BkError::new(ErrorKind::MetaVersionMismatch));
766        }
767        self.delete_ledger_path(&self.ledger_path, Some(stat.version)).await
768    }
769
770    async fn watch_ledger_metadata(
771        &mut self,
772        ledger_id: LedgerId,
773        start_version: MetaVersion,
774    ) -> BkResult<Box<dyn LedgerMetadataStream>> {
775        self.format_ledger_path(ledger_id);
776        let (stat, watcher) = match self.client.check_and_watch_stat(&self.ledger_path).await? {
777            (None, _) => return Err(BkError::new(ErrorKind::LedgerNotExisted)),
778            (Some(stat), watcher) => (stat, watcher),
779        };
780        let metadata_stream = ZkLedgerMetadataStream {
781            watcher: Some(watcher),
782            version: start_version.into(),
783            updated: stat.mzxid,
784            ledger_id,
785            ledger_path: self.ledger_path.clone(),
786            client: self.client.clone(),
787            manager: self.manager.clone(),
788        };
789        Ok(Box::new(metadata_stream))
790    }
791
792    async fn write_ledger_metadata(
793        &mut self,
794        metadata: &LedgerMetadata,
795        expected_version: MetaVersion,
796    ) -> Result<Either<Versioned<LedgerMetadata>, MetaVersion>, BkError> {
797        let ledger_id = metadata.ledger_id;
798        let data = serde::serialize_ledger_metadata(metadata)?;
799        self.format_ledger_path(ledger_id);
800        let stat = match self.client.check_stat(&self.ledger_path).await? {
801            None => return Err(BkError::new(ErrorKind::LedgerNotExisted)),
802            Some(stat) => stat,
803        };
804        match i64::from(expected_version).cmp(&stat.mzxid) {
805            Ordering::Less => {
806                let metadata = self.read_metadata(ledger_id, &self.ledger_path).await?;
807                return Ok(either::Left(metadata));
808            },
809            Ordering::Greater => {
810                return Err(BkError::with_description(ErrorKind::MetaClientError, &"zk client is not in sync"));
811            },
812            _ => {},
813        }
814
815        let result = self.client.set_data(&self.ledger_path, &data, Some(stat.version)).await;
816        match result {
817            Err(zk::Error::NoNode) => {
818                let err = BkError::new(ErrorKind::LedgerNotExisted);
819                return Err(err);
820            },
821            Err(zk::Error::BadVersion) => {},
822            Err(e) => {
823                let err = BkError::new(ErrorKind::MetaClientError).cause_by(e);
824                return Err(err);
825            },
826            Ok(stat) => {
827                let version = MetaVersion::from(stat);
828                return Ok(either::Right(version));
829            },
830        };
831        let metadata = self.read_metadata(ledger_id, &self.ledger_path).await?;
832        Ok(either::Left(metadata))
833    }
834
835    async fn delete_ledger_path(&self, path: &str, version: Option<i32>) -> BkResult<()> {
836        if let Err(err) = self.client.delete(path, version).await {
837            if err == zk::Error::NoNode {
838                return Err(BkError::new(ErrorKind::LedgerNotExisted));
839            }
840            return Err(From::from(err));
841        };
842        Ok(())
843    }
844
845    async fn watch_bookies(
846        &mut self,
847        path: &'static str,
848        exclude: &'static str,
849    ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
850        let (bookies, watcher) = ZkBookieUpdateStream::rebuild(&self.client, path, exclude, &mut self.scratch).await?;
851        let stream = ZkBookieUpdateStream {
852            expired: false,
853            client: self.client.clone(),
854            path,
855            exclude,
856            scratch: String::with_capacity(50),
857            watcher,
858            manager: self.manager.clone(),
859        };
860        Ok((bookies, Box::new(stream)))
861    }
862}
863
864#[derive(Clone)]
865pub struct ZkMetaStore {
866    clients: Arc<Mutex<VecDeque<ZkMetaClient>>>,
867}
868
869impl ZkMetaStore {
870    pub async fn new(mut config: ZkConfiguration) -> Result<ZkMetaStore, BkError> {
871        let configed_ledger_id_format = config.ledger_id_format.take();
872        let zookeeper = zk::Client::builder()
873            .with_session_timeout(config.timeout)
874            .connect(&config.connect)
875            .await?
876            .chroot(&config.root)
877            .unwrap();
878        let client = Arc::new(ZkClient { client: zookeeper });
879        let manager = Arc::new(ZkClientManager::new(config, client.clone()));
880        let layout_data = match client.get_data("/LAYOUT").await {
881            Err(zk::Error::NoNode) => return Err(BkError::new(ErrorKind::MetaClusterUninitialized)),
882            Err(err) => return Err(From::from(err)),
883            Ok((data, _)) => data,
884        };
885        let ledger_id_format = LedgerIdFormat::try_from_ledger_layout(layout_data)?;
886        if let Some(expected_ledger_id_format) = configed_ledger_id_format {
887            if ledger_id_format != expected_ledger_id_format {
888                let msg = format!(
889                    "expect ledger id format {}, got {} from ZooKeeper",
890                    expected_ledger_id_format, ledger_id_format
891                );
892                return Err(BkError::with_message(ErrorKind::MetaInvalidData, msg));
893            }
894        }
895        let ledger_hob =
896            if ledger_id_format == LedgerIdFormat::Flat { -1 } else { ZkMetaClient::get_max_hob(&client).await? };
897        let meta_client = ZkMetaClient::new(client, manager, ledger_id_format, ledger_hob);
898        let meta_store =
899            ZkMetaStore { clients: Arc::new(Mutex::new(VecDeque::from([meta_client.clone(), meta_client]))) };
900        Ok(meta_store)
901    }
902
903    fn get_client(&self) -> ZkMetaClientGuard<'_> {
904        let mut clients = self.clients.lock().unwrap();
905        let client = if clients.len() == 1 { clients[0].clone() } else { clients.pop_front().unwrap() };
906        drop(clients);
907        ZkMetaClientGuard { client: MaybeUninit::new(client), store: self }
908    }
909
910    async fn get_connected_client(&self) -> Result<ZkMetaClientGuard<'_>, BkError> {
911        let mut client = self.get_client();
912        client.check_expiration().await?;
913        Ok(client)
914    }
915
916    fn release_client(&self, client: ZkMetaClient) {
917        if client.terminated() {
918            return;
919        }
920        let mut clients = self.clients.lock().unwrap();
921        clients.push_back(client);
922    }
923}
924
925#[async_trait]
926impl LedgerMetadataStoreClient for ZkMetaStore {
927    async fn create_ledger_metadata(&self, metadata: &LedgerMetadata) -> Result<MetaVersion, BkError> {
928        let mut client = self.get_connected_client().await?;
929        return client.create_ledger_metadata(metadata).await;
930    }
931
932    async fn remove_ledger_metadata(
933        &self,
934        ledger_id: LedgerId,
935        expected_version: Option<MetaVersion>,
936    ) -> Result<(), BkError> {
937        let mut client = self.get_connected_client().await?;
938        return client.remove_ledger_metadata(ledger_id, expected_version).await;
939    }
940
941    async fn read_ledger_metadata(&self, ledger_id: LedgerId) -> Result<Versioned<LedgerMetadata>, BkError> {
942        let mut client = self.get_connected_client().await?;
943        return client.read_ledger_metadata(ledger_id).await;
944    }
945
946    async fn watch_ledger_metadata(
947        &self,
948        ledger_id: LedgerId,
949        start_version: MetaVersion,
950    ) -> BkResult<Box<dyn LedgerMetadataStream>> {
951        let mut client = self.get_connected_client().await?;
952        return client.watch_ledger_metadata(ledger_id, start_version).await;
953    }
954
955    async fn write_ledger_metadata(
956        &self,
957        metadata: &LedgerMetadata,
958        expected_version: MetaVersion,
959    ) -> Result<Either<Versioned<LedgerMetadata>, MetaVersion>, BkError> {
960        let mut client = self.get_connected_client().await?;
961        return client.write_ledger_metadata(metadata, expected_version).await;
962    }
963}
964
965impl MetaStore for ZkMetaStore {}