1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Duration;
4
5use async_trait::async_trait;
6use compact_str::CompactString;
7use either::Either;
8use etcd_client::{
9 Client,
10 Compare,
11 CompareOp,
12 Event,
13 EventType,
14 GetOptions,
15 KvClient,
16 PutOptions,
17 Txn,
18 TxnOp,
19 TxnOpResponse,
20 WatchClient,
21 WatchOptions,
22 WatchStream,
23 Watcher,
24};
25use ignore_result::Ignore;
26use uuid::Uuid;
27
28use super::serde;
29use super::types::{
30 self,
31 BookieRegistrationClient,
32 BookieServiceInfo,
33 BookieUpdate,
34 LedgerIdStoreClient,
35 LedgerMetadataStoreClient,
36 MetaStore,
37 MetaVersion,
38 Versioned,
39};
40use crate::client::errors::{BkError, ErrorKind};
41use crate::client::{LedgerId, LedgerMetadata};
42
43pub struct BookieUpdateStream {
44 bookie_path: String,
45 stream: WatchStream,
46 events: VecDeque<Event>,
47}
48
49#[async_trait]
50impl types::BookieUpdateStream for BookieUpdateStream {
51 async fn next(&mut self) -> Result<BookieUpdate, BkError> {
52 loop {
53 while let Some(event) = self.events.pop_front() {
54 if let Some(kv) = event.kv() {
55 let result = if event.event_type() == EventType::Put {
56 let bookie_service_info =
57 serde::deserialize_bookie_service_info(&self.bookie_path, kv.key(), kv.value())?;
58 BookieUpdate::Add(bookie_service_info)
59 } else {
60 let bookie_id = serde::deserialize_bookie_id(&self.bookie_path, kv.key())?;
61 BookieUpdate::Remove(bookie_id)
62 };
63 return Ok(result);
64 }
65 }
66 if let Some(response) = self.stream.message().await? {
67 let events = response.events();
68 events.iter().for_each(|e| self.events.push_back(e.clone()));
69 }
70 }
71 }
72}
73
74impl BookieUpdateStream {
75 fn new(bookie_path: String, stream: WatchStream) -> BookieUpdateStream {
76 BookieUpdateStream { bookie_path, stream, events: VecDeque::new() }
77 }
78}
79
80pub struct LedgerMetadataStream {
81 ledger_id: LedgerId,
82 watcher: Watcher,
83 stream: WatchStream,
84 events: VecDeque<Event>,
85 cancelled: bool,
86}
87
88#[async_trait]
89impl types::LedgerMetadataStream for LedgerMetadataStream {
90 async fn cancel(&mut self) {
91 if self.cancelled {
92 return;
93 }
94 self.events.clear();
95 self.cancelled = true;
96 self.watcher.cancel().await.ignore();
97 }
98
99 async fn next(&mut self) -> Result<Versioned<LedgerMetadata>, BkError> {
100 loop {
101 while let Some(event) = self.events.pop_front() {
102 if let Some(kv) = event.kv() {
103 if event.event_type() == EventType::Delete {
104 return Err(BkError::new(ErrorKind::LedgerNotExisted));
105 }
106 let version = kv.mod_revision();
107 let metadata = serde::deserialize_ledger_metadata(self.ledger_id, kv.value())?;
108 return Ok(Versioned::new(MetaVersion(version), metadata));
109 }
110 }
111 if let Some(response) = self.stream.message().await? {
112 let events = response.events();
113 events.iter().for_each(|e| self.events.push_back(e.clone()));
114 };
115 }
116 }
117}
118
119pub struct EtcdConfiguration {
120 scope: CompactString,
121 #[allow(dead_code)]
122 user: Option<(String, String)>,
123 #[allow(dead_code)]
124 keep_alive: Option<(Duration, Duration)>,
125}
126
127impl EtcdConfiguration {
128 pub fn new(scope: CompactString) -> EtcdConfiguration {
129 EtcdConfiguration { scope, user: None, keep_alive: None }
130 }
131
132 #[allow(dead_code)]
133 pub fn with_user(self, name: String, password: String) -> EtcdConfiguration {
134 EtcdConfiguration { user: Some((name, password)), ..self }
135 }
136
137 #[allow(dead_code)]
138 pub fn with_keep_alive(self, interval: Duration, timeout: Duration) -> EtcdConfiguration {
139 EtcdConfiguration { keep_alive: Some((interval, timeout)), ..self }
140 }
141}
142
143pub struct EtcdMetaStore {
144 scope: CompactString,
145 client: KvClient,
146 watcher: WatchClient,
147 bucket_counter: AtomicU64,
148}
149
150impl Clone for EtcdMetaStore {
151 fn clone(&self) -> EtcdMetaStore {
152 EtcdMetaStore {
153 scope: self.scope.clone(),
154 client: self.client.clone(),
155 watcher: self.watcher.clone(),
156 bucket_counter: AtomicU64::new(0),
157 }
158 }
159}
160
161unsafe impl Send for EtcdMetaStore {}
162unsafe impl Sync for EtcdMetaStore {}
163
164const NUM_BUCKETS: u64 = 0x80;
165const BUCKET_SHIFT: i32 = 56;
166const MAX_ID_PER_BUCKET: u64 = 0x00ffffffffffffff;
167
168impl EtcdMetaStore {
169 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
170 endpoints: S,
171 configuration: EtcdConfiguration,
172 ) -> Result<EtcdMetaStore, BkError> {
173 let client = Client::connect(endpoints, None).await?;
174 Ok(EtcdMetaStore {
175 scope: configuration.scope,
176 client: client.kv_client(),
177 watcher: client.watch_client(),
178 bucket_counter: AtomicU64::new(0),
179 })
180 }
181
182 fn next_bucket(&self) -> u64 {
183 let u = self.bucket_counter.fetch_add(1, Ordering::Relaxed);
184 u % NUM_BUCKETS
185 }
186
187 fn bucket_path(&self, bucket: u64) -> String {
188 format!("{}/buckets/{:03}", self.scope, bucket)
189 }
190
191 fn ledger_path(&self, ledger_id: LedgerId) -> String {
192 let least64: i64 = ledger_id.into();
193 let combined_id = least64 as u64 as u128;
194 let uuid = Uuid::from_u128(combined_id);
195 format!("{}/ledgers/{}", self.scope, uuid)
196 }
197
198 fn writable_bookie_directory_path(&self) -> String {
199 format!("{}/bookies/writable/", self.scope)
200 }
201
202 fn readable_bookie_directory_path(&self) -> String {
203 format!("{}/bookies/readable/", self.scope)
204 }
205
206 async fn watch_bookies_update(
207 &self,
208 bookie_path: String,
209 start_revision: i64,
210 ) -> Result<BookieUpdateStream, BkError> {
211 let mut bookie_path_end: Vec<u8> = bookie_path.clone().into();
212 bookie_path_end.push(0xff);
213 let options = WatchOptions::new().with_range(bookie_path_end).with_start_revision(start_revision);
214 let (_watcher, stream) = self.watcher.clone().watch(bookie_path.clone(), Some(options)).await?;
215 Ok(BookieUpdateStream::new(bookie_path, stream))
216 }
217
218 async fn watch_bookies(
219 &self,
220 bookie_path: String,
221 ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
222 let mut bookie_path_end: Vec<u8> = bookie_path.clone().into();
223 bookie_path_end.push(0xff);
224 let options = GetOptions::new().with_range(bookie_path_end).with_serializable();
225 let mut client = self.client.clone();
226 let response = client.get(bookie_path.clone(), Some(options)).await?;
227 let Some(header) = response.header() else {
228 let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"no header");
229 return Err(err);
230 };
231 let start_revision = header.revision();
232 let mut err: Option<BkError> = None;
233 let bookies: Vec<BookieServiceInfo> = response
234 .kvs()
235 .iter()
236 .filter_map(|kv| match serde::deserialize_bookie_service_info(&bookie_path, kv.key(), kv.value()) {
237 Err(e) => {
238 err = Some(e);
239 None
240 },
241 Ok(bookie) => Some(bookie),
242 })
243 .collect();
244 if let Some(err) = err {
245 return Err(err);
246 }
247 let update_stream = self.watch_bookies_update(bookie_path, start_revision + 1).await?;
248 Ok((bookies, Box::new(update_stream)))
249 }
250}
251
252#[async_trait]
253impl BookieRegistrationClient for EtcdMetaStore {
254 async fn watch_readable_bookies(
255 &mut self,
256 ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
257 let bookie_path = self.writable_bookie_directory_path();
258 self.watch_bookies(bookie_path).await
259 }
260
261 async fn watch_writable_bookies(
262 &mut self,
263 ) -> Result<(Vec<BookieServiceInfo>, Box<dyn types::BookieUpdateStream>), BkError> {
264 let bookie_path = self.readable_bookie_directory_path();
265 self.watch_bookies(bookie_path).await
266 }
267}
268
269#[async_trait]
270impl LedgerIdStoreClient for EtcdMetaStore {
271 async fn generate_ledger_id(&self) -> Result<LedgerId, BkError> {
272 let bucket_id = self.next_bucket();
273 let bucket_path = self.bucket_path(bucket_id);
274 let options = PutOptions::new().with_prev_key();
275 let mut client = self.client.clone();
276 let response = client.put(bucket_path, Vec::new(), Some(options)).await?;
277 let previous_version = response.prev_key().map_or(0, |kv| kv.version()) as u64;
278 let version = previous_version + 1;
279 assert!(version <= MAX_ID_PER_BUCKET);
280 let id = (bucket_id << BUCKET_SHIFT) | version;
281 return Ok(LedgerId(id as i64));
282 }
283}
284
285impl From<etcd_client::Error> for BkError {
286 fn from(e: etcd_client::Error) -> BkError {
287 BkError::new(ErrorKind::MetaClientError).cause_by(e)
288 }
289}
290
291impl From<prost::DecodeError> for BkError {
292 fn from(e: prost::DecodeError) -> BkError {
293 BkError::new(ErrorKind::MetaInvalidData).cause_by(e)
294 }
295}
296
297#[async_trait]
298impl LedgerMetadataStoreClient for EtcdMetaStore {
299 async fn create_ledger_metadata(&self, metadata: &LedgerMetadata) -> Result<MetaVersion, BkError> {
300 let ledger_path = self.ledger_path(metadata.ledger_id);
301 let serialized_metadata = serde::serialize_ledger_metadata(metadata)?;
302 let compare = Compare::create_revision(ledger_path.clone(), CompareOp::Greater, 0);
303 let create = TxnOp::put(ledger_path, serialized_metadata, None);
304 let txn = Txn::new().when(vec![compare]).or_else(vec![create]);
305 let mut client = self.client.clone();
306 let response = client.txn(txn).await?;
307 if response.succeeded() {
308 return Err(BkError::new(ErrorKind::LedgerExisted));
309 }
310 let Some(TxnOpResponse::Put(put_response)) = response.op_responses().into_iter().next() else {
311 let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"put succeed with no put response");
312 return Err(err);
313 };
314 let Some(revision) = put_response.header().map(|h| h.revision()) else {
315 let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"no header in put response");
316 return Err(err);
317 };
318 return Ok(MetaVersion::from(revision));
319 }
320
321 async fn remove_ledger_metadata(
322 &self,
323 ledger_id: LedgerId,
324 expected_version: Option<MetaVersion>,
325 ) -> Result<(), BkError> {
326 let ledger_path = self.ledger_path(ledger_id);
327 let mut client = self.client.clone();
328 let Some(version) = expected_version else {
329 let response = client.delete(ledger_path, None).await?;
330 if response.deleted() <= 0 {
331 return Err(BkError::new(ErrorKind::LedgerNotExisted));
332 }
333 return Ok(());
334 };
335 let compare = Compare::mod_revision(ledger_path.clone(), CompareOp::Equal, version.into());
336 let delete = TxnOp::delete(ledger_path.clone(), None);
337 let get_op = TxnOp::get(ledger_path, Some(GetOptions::new().with_count_only()));
338 let txn = Txn::new().when(vec![compare]).and_then(vec![delete]).or_else(vec![get_op]);
339 let response = client.txn(txn).await?;
340 if response.succeeded() {
341 return Ok(());
342 }
343 let Some(TxnOpResponse::Get(get_response)) = response.op_responses().into_iter().next() else {
344 let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"get succeed with no get response");
345 return Err(err);
346 };
347 if get_response.count() != 0 {
348 return Err(BkError::new(ErrorKind::MetaVersionMismatch));
349 }
350 return Err(BkError::new(ErrorKind::LedgerNotExisted));
351 }
352
353 async fn read_ledger_metadata(&self, ledger_id: LedgerId) -> Result<Versioned<LedgerMetadata>, BkError> {
354 let ledger_path = self.ledger_path(ledger_id);
355 let mut client = self.client.clone();
356 let response = client.get(ledger_path, None).await?;
357 let Some(kv) = response.kvs().first() else {
358 return Err(BkError::new(ErrorKind::LedgerNotExisted));
359 };
360 let metadata = serde::deserialize_ledger_metadata(ledger_id, kv.value())?;
361 let version = MetaVersion::from(kv.mod_revision());
362 return Ok(Versioned::new(version, metadata));
363 }
364
365 async fn watch_ledger_metadata(
366 &self,
367 ledger_id: LedgerId,
368 start_version: MetaVersion,
369 ) -> Result<Box<dyn types::LedgerMetadataStream>, BkError> {
370 let ledger_path = self.ledger_path(ledger_id);
371 let options = WatchOptions::new().with_start_revision(start_version.into());
372 let mut client = self.watcher.clone();
373 let (watcher, stream) = client.watch(ledger_path, Some(options)).await?;
374 let stream = LedgerMetadataStream { ledger_id, watcher, stream, events: VecDeque::new(), cancelled: false };
375 return Ok(Box::new(stream));
376 }
377
378 async fn write_ledger_metadata(
379 &self,
380 metadata: &LedgerMetadata,
381 expected_version: MetaVersion,
382 ) -> Result<Either<Versioned<LedgerMetadata>, MetaVersion>, BkError> {
383 let serialized_metadata = serde::serialize_ledger_metadata(metadata)?;
384 let ledger_path = self.ledger_path(metadata.ledger_id);
385 let compare = Compare::mod_revision(ledger_path.clone(), CompareOp::Equal, expected_version.into());
386 let put_op = TxnOp::put(ledger_path.clone(), serialized_metadata, None);
387 let get_op = TxnOp::get(ledger_path, Some(GetOptions::new().with_count_only()));
388 let txn = Txn::new().when(vec![compare]).and_then(vec![put_op]).or_else(vec![get_op]);
389 let mut client = self.client.clone();
390 let response = client.txn(txn).await?;
391 if response.succeeded() {
392 let Some(TxnOpResponse::Put(put_response)) = response.op_responses().into_iter().next() else {
393 let err =
394 BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"put succeed with no put response");
395 return Err(err);
396 };
397 let Some(revision) = put_response.header().map(|h| h.revision()) else {
398 let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"no header in put response");
399 return Err(err);
400 };
401 return Ok(either::Right(MetaVersion::from(revision)));
402 }
403 let Some(TxnOpResponse::Get(get_response)) = response.op_responses().into_iter().next() else {
404 let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"get succeed with no get response");
405 return Err(err);
406 };
407 let Some(kv) = get_response.kvs().first() else {
408 return Err(BkError::new(ErrorKind::LedgerNotExisted));
409 };
410 let Some(conflicting_revision) = get_response.header().map(|h| h.revision()) else {
411 let err = BkError::with_description(ErrorKind::MetaUnexpectedResponse, &"no header in get response");
412 return Err(err);
413 };
414 let conflicting_metadata = serde::deserialize_ledger_metadata(metadata.ledger_id, kv.value())?;
415 Ok(either::Left(Versioned::new(conflicting_revision, conflicting_metadata)))
416 }
417}
418
419impl MetaStore for EtcdMetaStore {}