bookkeeper_client/meta/
etcd.rs

1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Duration;
4
5use async_trait::async_trait;
6use compact_str::CompactString;
7use either::Either;
8use etcd_client::{
9    Client,
10    Compare,
11    CompareOp,
12    Event,
13    EventType,
14    GetOptions,
15    KvClient,
16    PutOptions,
17    Txn,
18    TxnOp,
19    TxnOpResponse,
20    WatchClient,
21    WatchOptions,
22    WatchStream,
23    Watcher,
24};
25use ignore_result::Ignore;
26use uuid::Uuid;
27
28use super::serde;
29use super::types::{
30    self,
31    BookieRegistrationClient,
32    BookieServiceInfo,
33    BookieUpdate,
34    LedgerIdStoreClient,
35    LedgerMetadataStoreClient,
36    MetaStore,
37    MetaVersion,
38    Versioned,
39};
40use crate::client::errors::{BkError, ErrorKind};
41use crate::client::{LedgerId, LedgerMetadata};
42
43pub struct BookieUpdateStream {
44    bookie_path: String,
45    stream: WatchStream,
46    events: VecDeque<Event>,
47}
48
49#[async_trait]
50impl types::BookieUpdateStream for BookieUpdateStream {
51    async fn next(&mut self) -> Result<BookieUpdate, BkError> {
52        loop {
53            while let Some(event) = self.events.pop_front() {
54                if let Some(kv) = event.kv() {
55                    let result = if event.event_type() == EventType::Put {
56                        let bookie_service_info =
57                            serde::deserialize_bookie_service_info(&self.bookie_path, kv.key(), kv.value())?;
58                        BookieUpdate::Add(bookie_service_info)
59                    } else {
60                        let bookie_id = serde::deserialize_bookie_id(&self.bookie_path, kv.key())?;
61                        BookieUpdate::Remove(bookie_id)
62                    };
63                    return Ok(result);
64                }
65            }
66            if let Some(response) = self.stream.message().await? {
67                let events = response.events();
68                events.iter().for_each(|e| self.events.push_back(e.clone()));
69            }
70        }
71    }
72}
73
74impl BookieUpdateStream {
75    fn new(bookie_path: String, stream: WatchStream) -> BookieUpdateStream {
76        BookieUpdateStream { bookie_path, stream, events: VecDeque::new() }
77    }
78}
79
80pub struct LedgerMetadataStream {
81    ledger_id: LedgerId,
82    watcher: Watcher,
83    stream: WatchStream,
84    events: VecDeque<Event>,
85    cancelled: bool,
86}
87
88#[async_trait]
89impl types::LedgerMetadataStream for LedgerMetadataStream {
90    async fn cancel(&mut self) {
91        if self.cancelled {
92            return;
93        }
94        self.events.clear();
95        self.cancelled = true;
96        self.watcher.cancel().await.ignore();
97    }
98
99    async fn next(&mut self) -> Result<Versioned<LedgerMetadata>, BkError> {
100        loop {
101            while let Some(event) = self.events.pop_front() {
102                if let Some(kv) = event.kv() {
103                    if event.event_type() == EventType::Delete {
104                        return Err(BkError::new(ErrorKind::LedgerNotExisted));
105                    }
106                    let version = kv.mod_revision();
107                    let metadata = serde::deserialize_ledger_metadata(self.ledger_id, kv.value())?;
108                    return Ok(Versioned::new(MetaVersion(version), metadata));
109                }
110            }
111            if let Some(response) = self.stream.message().await? {
112                let events = response.events();
113                events.iter().for_each(|e| self.events.push_back(e.clone()));
114            };
115        }
116    }
117}
118
119pub struct EtcdConfiguration {
120    scope: CompactString,
121    #[allow(dead_code)]
122    user: Option<(String, String)>,
123    #[allow(dead_code)]
124    keep_alive: Option<(Duration, Duration)>,
125}
126
127impl EtcdConfiguration {
128    pub fn new(scope: CompactString) -> EtcdConfiguration {
129        EtcdConfiguration { scope, user: None, keep_alive: None }
130    }
131
132    #[allow(dead_code)]
133    pub fn with_user(self, name: String, password: String) -> EtcdConfiguration {
134        EtcdConfiguration { user: Some((name, password)), ..self }
135    }
136
137    #[allow(dead_code)]
138    pub fn with_keep_alive(self, interval: Duration, timeout: Duration) -> EtcdConfiguration {
139        EtcdConfiguration { keep_alive: Some((interval, timeout)), ..self }
140    }
141}
142
143pub struct EtcdMetaStore {
144    scope: CompactString,
145    client: KvClient,
146    watcher: WatchClient,
147    bucket_counter: AtomicU64,
148}
149
150impl Clone for EtcdMetaStore {
151    fn clone(&self) -> EtcdMetaStore {
152        EtcdMetaStore {
153            scope: self.scope.clone(),
154            client: self.client.clone(),
155            watcher: self.watcher.clone(),
156            bucket_counter: AtomicU64::new(0),
157        }
158    }
159}
160
161unsafe impl Send for EtcdMetaStore {}
162unsafe impl Sync for EtcdMetaStore {}
163
164const NUM_BUCKETS: u64 = 0x80;
165const BUCKET_SHIFT: i32 = 56;
166const MAX_ID_PER_BUCKET: u64 = 0x00ffffffffffffff;
167
168impl EtcdMetaStore {
169    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
170        endpoints: S,
171        configuration: EtcdConfiguration,
172    ) -> Result<EtcdMetaStore, BkError> {
173        let client = Client::connect(endpoints, None).await?;
174        Ok(EtcdMetaStore {
175            scope: configuration.scope,
176            client: client.kv_client(),
177            watcher: client.watch_client(),
178            bucket_counter: AtomicU64::new(0),
179        })
180    }
181
182    fn next_bucket(&self) -> u64 {
183        let u = self.bucket_counter.fetch_add(1, Ordering::Relaxed);
184        u % NUM_BUCKETS
185    }
186
187    fn bucket_path(&self, bucket: u64) -> String {
188        format!("{}/buckets/{:03}", self.scope, bucket)
189    }
190
191    fn ledger_path(&self, ledger_id: LedgerId) -> String {
192        let least64: i64 = ledger_id.into();
193        let combined_id = least64 as u64 as u128;
194        let uuid = Uuid::from_u128(combined_id);
195        format!("{}/ledgers/{}", self.scope, uuid)
196    }
197
198    fn writable_bookie_directory_path(&self) -> String {
199        format!("{}/bookies/writable/", self.scope)
200    }
201
202    fn readable_bookie_directory_path(&self) -> String {
203        format!("{}/bookies/readable/", self.scope)
204    }
205
206    async fn watch_bookies_update(
207        &self,
208        bookie_path: String,
209        start_revision: i64,
210    ) -> Result<BookieUpdateStream, BkError> {
211        let mut bookie_path_end: Vec<u8> = bookie_path.clone().into();
212        bookie_path_end.push(0xff);
213        let options = WatchOptions::new().with_range(bookie_path_end).with_start_revision(start_revision);
214        let (_watcher, stream) = self.watcher.clone().watch(bookie_path.clone(), Some(options)).await?;
215        Ok(BookieUpdateStream::new(bookie_path, stream))
216    }
217
218    async fn watch_bookies(
219        &self,
220        bookie_path: String,
221    ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
222        let mut bookie_path_end: Vec<u8> = bookie_path.clone().into();
223        bookie_path_end.push(0xff);
224        let options = GetOptions::new().with_range(bookie_path_end).with_serializable();
225        let mut client = self.client.clone();
226        let response = client.get(bookie_path.clone(), Some(options)).await?;
227        let Some(header) = response.header() else {
228            let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"no header");
229            return Err(err);
230        };
231        let start_revision = header.revision();
232        let mut err: Option<BkError> = None;
233        let bookies: Vec<BookieServiceInfo> = response
234            .kvs()
235            .iter()
236            .filter_map(|kv| match serde::deserialize_bookie_service_info(&bookie_path, kv.key(), kv.value()) {
237                Err(e) => {
238                    err = Some(e);
239                    None
240                },
241                Ok(bookie) => Some(bookie),
242            })
243            .collect();
244        if let Some(err) = err {
245            return Err(err);
246        }
247        let update_stream = self.watch_bookies_update(bookie_path, start_revision + 1).await?;
248        Ok((bookies, Box::new(update_stream)))
249    }
250}
251
252#[async_trait]
253impl BookieRegistrationClient for EtcdMetaStore {
254    async fn watch_readable_bookies(
255        &mut self,
256    ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
257        let bookie_path = self.writable_bookie_directory_path();
258        self.watch_bookies(bookie_path).await
259    }
260
261    async fn watch_writable_bookies(
262        &mut self,
263    ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
264        let bookie_path = self.readable_bookie_directory_path();
265        self.watch_bookies(bookie_path).await
266    }
267}
268
269#[async_trait]
270impl LedgerIdStoreClient for EtcdMetaStore {
271    async fn generate_ledger_id(&self) -> Result<LedgerId, BkError> {
272        let bucket_id = self.next_bucket();
273        let bucket_path = self.bucket_path(bucket_id);
274        let options = PutOptions::new().with_prev_key();
275        let mut client = self.client.clone();
276        let response = client.put(bucket_path, Vec::new(), Some(options)).await?;
277        let previous_version = response.prev_key().map_or(0, |kv| kv.version()) as u64;
278        let version = previous_version + 1;
279        assert!(version <= MAX_ID_PER_BUCKET);
280        let id = (bucket_id << BUCKET_SHIFT) | version;
281        return Ok(LedgerId(id as i64));
282    }
283}
284
285impl From<etcd_client::Error> for BkError {
286    fn from(e: etcd_client::Error) -> BkError {
287        BkError::new(ErrorKind::MetaClientError).cause_by(e)
288    }
289}
290
291impl From<prost::DecodeError> for BkError {
292    fn from(e: prost::DecodeError) -> BkError {
293        BkError::new(ErrorKind::MetaInvalidData).cause_by(e)
294    }
295}
296
297#[async_trait]
298impl LedgerMetadataStoreClient for EtcdMetaStore {
299    async fn create_ledger_metadata(&self, metadata: &LedgerMetadata) -> Result<MetaVersion, BkError> {
300        let ledger_path = self.ledger_path(metadata.ledger_id);
301        let serialized_metadata = serde::serialize_ledger_metadata(metadata)?;
302        let compare = Compare::create_revision(ledger_path.clone(), CompareOp::Greater, 0);
303        let create = TxnOp::put(ledger_path, serialized_metadata, None);
304        let txn = Txn::new().when(vec![compare]).or_else(vec![create]);
305        let mut client = self.client.clone();
306        let response = client.txn(txn).await?;
307        if response.succeeded() {
308            return Err(BkError::new(ErrorKind::LedgerExisted));
309        }
310        let Some(TxnOpResponse::Put(put_response)) = response.op_responses().into_iter().next() else {
311            let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"put succeed with no put response");
312            return Err(err);
313        };
314        let Some(revision) = put_response.header().map(|h| h.revision()) else {
315            let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"no header in put response");
316            return Err(err);
317        };
318        return Ok(MetaVersion::from(revision));
319    }
320
321    async fn remove_ledger_metadata(
322        &self,
323        ledger_id: LedgerId,
324        expected_version: Option<MetaVersion>,
325    ) -> Result<(), BkError> {
326        let ledger_path = self.ledger_path(ledger_id);
327        let mut client = self.client.clone();
328        let Some(version) = expected_version else {
329            let response = client.delete(ledger_path, None).await?;
330            if response.deleted() <= 0 {
331                return Err(BkError::new(ErrorKind::LedgerNotExisted));
332            }
333            return Ok(());
334        };
335        let compare = Compare::mod_revision(ledger_path.clone(), CompareOp::Equal, version.into());
336        let delete = TxnOp::delete(ledger_path.clone(), None);
337        let get_op = TxnOp::get(ledger_path, Some(GetOptions::new().with_count_only()));
338        let txn = Txn::new().when(vec![compare]).and_then(vec![delete]).or_else(vec![get_op]);
339        let response = client.txn(txn).await?;
340        if response.succeeded() {
341            return Ok(());
342        }
343        let Some(TxnOpResponse::Get(get_response)) = response.op_responses().into_iter().next() else {
344            let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"get succeed with no get response");
345            return Err(err);
346        };
347        if get_response.count() != 0 {
348            return Err(BkError::new(ErrorKind::MetaVersionMismatch));
349        }
350        return Err(BkError::new(ErrorKind::LedgerNotExisted));
351    }
352
353    async fn read_ledger_metadata(&self, ledger_id: LedgerId) -> Result<Versioned<LedgerMetadata>, BkError> {
354        let ledger_path = self.ledger_path(ledger_id);
355        let mut client = self.client.clone();
356        let response = client.get(ledger_path, None).await?;
357        let Some(kv) = response.kvs().first() else {
358            return Err(BkError::new(ErrorKind::LedgerNotExisted));
359        };
360        let metadata = serde::deserialize_ledger_metadata(ledger_id, kv.value())?;
361        let version = MetaVersion::from(kv.mod_revision());
362        return Ok(Versioned::new(version, metadata));
363    }
364
365    async fn watch_ledger_metadata(
366        &self,
367        ledger_id: LedgerId,
368        start_version: MetaVersion,
369    ) -> Result<Box<dyn types::LedgerMetadataStream>, BkError> {
370        let ledger_path = self.ledger_path(ledger_id);
371        let options = WatchOptions::new().with_start_revision(start_version.into());
372        let mut client = self.watcher.clone();
373        let (watcher, stream) = client.watch(ledger_path, Some(options)).await?;
374        let stream = LedgerMetadataStream { ledger_id, watcher, stream, events: VecDeque::new(), cancelled: false };
375        return Ok(Box::new(stream));
376    }
377
378    async fn write_ledger_metadata(
379        &self,
380        metadata: &LedgerMetadata,
381        expected_version: MetaVersion,
382    ) -> Result<Either<Versioned<LedgerMetadata>, MetaVersion>, BkError> {
383        let serialized_metadata = serde::serialize_ledger_metadata(metadata)?;
384        let ledger_path = self.ledger_path(metadata.ledger_id);
385        let compare = Compare::mod_revision(ledger_path.clone(), CompareOp::Equal, expected_version.into());
386        let put_op = TxnOp::put(ledger_path.clone(), serialized_metadata, None);
387        let get_op = TxnOp::get(ledger_path, Some(GetOptions::new().with_count_only()));
388        let txn = Txn::new().when(vec![compare]).and_then(vec![put_op]).or_else(vec![get_op]);
389        let mut client = self.client.clone();
390        let response = client.txn(txn).await?;
391        if response.succeeded() {
392            let Some(TxnOpResponse::Put(put_response)) = response.op_responses().into_iter().next() else {
393                let err =
394                    BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"put succeed with no put response");
395                return Err(err);
396            };
397            let Some(revision) = put_response.header().map(|h| h.revision()) else {
398                let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"no header in put response");
399                return Err(err);
400            };
401            return Ok(either::Right(MetaVersion::from(revision)));
402        }
403        let Some(TxnOpResponse::Get(get_response)) = response.op_responses().into_iter().next() else {
404            let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"get succeed with no get response");
405            return Err(err);
406        };
407        let Some(kv) = get_response.kvs().first() else {
408            return Err(BkError::new(ErrorKind::LedgerNotExisted));
409        };
410        let Some(conflicting_revision) = get_response.header().map(|h| h.revision()) else {
411            let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"no header in get response");
412            return Err(err);
413        };
414        let conflicting_metadata = serde::deserialize_ledger_metadata(metadata.ledger_id, kv.value())?;
415        Ok(either::Left(Versioned::new(conflicting_revision, conflicting_metadata)))
416    }
417}
418
419impl MetaStore for EtcdMetaStore {}