1#[macro_use]
55extern crate tracing;
56
57mod core;
58mod error;
59mod keyspace;
60mod replication;
61mod rpc;
62mod statistics;
63mod storage;
64#[cfg(any(test, feature = "test-utils"))]
65pub mod test_utils;
66
67use std::borrow::Cow;
68use std::future::Future;
69use std::marker::PhantomData;
70use std::net::SocketAddr;
71use std::sync::Arc;
72use std::time::Duration;
73
74use async_trait::async_trait;
75use datacake_crdt::Key;
76use datacake_node::{
77 ClusterExtension,
78 Consistency,
79 ConsistencyError,
80 DatacakeHandle,
81 DatacakeNode,
82 Nodes,
83};
84pub use error::StoreError;
85use futures::stream::FuturesUnordered;
86use futures::StreamExt;
87pub use statistics::SystemStatistics;
88#[cfg(any(feature = "test-utils", feature = "test-suite"))]
89pub use storage::test_suite;
90pub use storage::{BulkMutationError, ProgressTracker, PutContext, Storage};
91
92pub use self::core::{Document, DocumentMetadata};
93use crate::core::DocVec;
94use crate::keyspace::{
95 Del,
96 KeyspaceGroup,
97 MultiDel,
98 MultiSet,
99 Set,
100 CONSISTENCY_SOURCE_ID,
101};
102use crate::replication::{
103 Mutation,
104 ReplicationCycleContext,
105 ReplicationHandle,
106 TaskDistributor,
107 TaskServiceContext,
108};
109use crate::rpc::services::consistency_impl::ConsistencyService;
110use crate::rpc::services::replication_impl::ReplicationService;
111use crate::rpc::ConsistencyClient;
112
113const TIMEOUT: Duration = Duration::from_secs(2);
114const DEFAULT_REPAIR_INTERVAL: Duration = if cfg!(any(test, feature = "test-utils")) {
115 Duration::from_secs(1)
116} else {
117 Duration::from_secs(60 * 60) };
119
120pub struct EventuallyConsistentStoreExtension<S>
130where
131 S: Storage,
132{
133 datastore: S,
134 repair_interval: Duration,
135}
136
137impl<S> EventuallyConsistentStoreExtension<S>
138where
139 S: Storage,
140{
141 pub fn new(store: S) -> Self {
144 Self {
145 datastore: store,
146 repair_interval: DEFAULT_REPAIR_INTERVAL,
147 }
148 }
149
150 pub fn with_repair_interval(mut self, dur: Duration) -> Self {
152 self.repair_interval = dur;
153 self
154 }
155}
156
157#[async_trait]
158impl<S> ClusterExtension for EventuallyConsistentStoreExtension<S>
159where
160 S: Storage,
161{
162 type Output = EventuallyConsistentStore<S>;
163 type Error = StoreError<S::Error>;
164
165 async fn init_extension(
166 self,
167 node: &DatacakeNode,
168 ) -> Result<Self::Output, Self::Error> {
169 EventuallyConsistentStore::create(self.datastore, self.repair_interval, node)
170 .await
171 }
172}
173
174pub struct EventuallyConsistentStore<S>
184where
185 S: Storage,
186{
187 node: DatacakeHandle,
188 group: KeyspaceGroup<S>,
189 task_service: TaskDistributor,
190 repair_service: ReplicationHandle,
191 statistics: SystemStatistics,
192}
193
194impl<S> EventuallyConsistentStore<S>
195where
196 S: Storage,
197{
198 async fn create(
199 datastore: S,
200 repair_interval: Duration,
201 node: &DatacakeNode,
202 ) -> Result<Self, StoreError<S::Error>> {
203 let storage = Arc::new(datastore);
204
205 let group = KeyspaceGroup::new(storage.clone(), node.clock().clone()).await;
206 let statistics = SystemStatistics::default();
207
208 group.load_states_from_storage().await?;
210
211 let task_ctx = TaskServiceContext {
212 clock: node.clock().clone(),
213 network: node.network().clone(),
214 local_node_id: node.me().node_id,
215 public_node_addr: node.me().public_addr,
216 };
217 let replication_ctx = ReplicationCycleContext {
218 repair_interval,
219 group: group.clone(),
220 network: node.network().clone(),
221 };
222 let task_service =
223 replication::start_task_distributor_service::<S>(task_ctx).await;
224 let repair_service = replication::start_replication_cycle(replication_ctx).await;
225
226 tokio::spawn(watch_membership_changes(
227 task_service.clone(),
228 repair_service.clone(),
229 node.handle(),
230 ));
231
232 node.add_rpc_service(ConsistencyService::new(
233 group.clone(),
234 node.network().clone(),
235 ));
236 node.add_rpc_service(ReplicationService::new(group.clone()));
237
238 Ok(Self {
239 node: node.handle(),
240 group,
241 statistics,
242 task_service,
243 repair_service,
244 })
245 }
246
247 #[inline]
248 pub fn statistics(&self) -> &SystemStatistics {
250 &self.statistics
251 }
252
253 pub fn handle(&self) -> ReplicatedStoreHandle<S> {
257 ReplicatedStoreHandle {
258 node: self.node.clone(),
259 task_service: self.task_service.clone(),
260 statistics: self.statistics.clone(),
261 group: self.group.clone(),
262 }
263 }
264
265 pub fn handle_with_keyspace(
269 &self,
270 keyspace: impl Into<String>,
271 ) -> ReplicatorKeyspaceHandle<S> {
272 ReplicatorKeyspaceHandle {
273 inner: self.handle(),
274 keyspace: Cow::Owned(keyspace.into()),
275 }
276 }
277}
278
279impl<S> Drop for EventuallyConsistentStore<S>
280where
281 S: Storage,
282{
283 fn drop(&mut self) {
284 self.task_service.kill();
285 self.repair_service.kill();
286 }
287}
288
289pub struct ReplicatedStoreHandle<S>
291where
292 S: Storage,
293{
294 node: DatacakeHandle,
295 group: KeyspaceGroup<S>,
296 task_service: TaskDistributor,
297 statistics: SystemStatistics,
298}
299
300impl<S> Clone for ReplicatedStoreHandle<S>
301where
302 S: Storage,
303{
304 fn clone(&self) -> Self {
305 Self {
306 node: self.node.clone(),
307 group: self.group.clone(),
308 task_service: self.task_service.clone(),
309 statistics: self.statistics.clone(),
310 }
311 }
312}
313
314impl<S> ReplicatedStoreHandle<S>
315where
316 S: Storage,
317{
318 #[inline]
319 pub fn statistics(&self) -> &SystemStatistics {
321 &self.statistics
322 }
323
324 pub fn with_keyspace(
328 &self,
329 keyspace: impl Into<String>,
330 ) -> ReplicatorKeyspaceHandle<S> {
331 ReplicatorKeyspaceHandle {
332 inner: self.clone(),
333 keyspace: Cow::Owned(keyspace.into()),
334 }
335 }
336
337 pub async fn get(
339 &self,
340 keyspace: &str,
341 doc_id: Key,
342 ) -> Result<Option<Document>, S::Error> {
343 let storage = self.group.storage();
344 storage.get(keyspace, doc_id).await
345 }
346
347 pub async fn get_many<I, T>(
352 &self,
353 keyspace: &str,
354 doc_ids: I,
355 ) -> Result<S::DocsIter, S::Error>
356 where
357 T: Iterator<Item = Key> + Send,
358 I: IntoIterator<IntoIter = T> + Send,
359 {
360 let storage = self.group.storage();
361 storage.multi_get(keyspace, doc_ids.into_iter()).await
362 }
363
364 pub async fn put<D>(
366 &self,
367 keyspace: &str,
368 doc_id: Key,
369 data: D,
370 consistency: Consistency,
371 ) -> Result<(), StoreError<S::Error>>
372 where
373 D: Into<Vec<u8>>,
374 {
375 let nodes = self
376 .node
377 .select_nodes(consistency)
378 .await
379 .map_err(StoreError::ConsistencyError)?;
380
381 let last_updated = self.node.clock().get_time().await;
382 let document = Document::new(doc_id, last_updated, data);
383
384 let keyspace = self.group.get_or_create_keyspace(keyspace).await;
385 let msg = Set {
386 source: CONSISTENCY_SOURCE_ID,
387 doc: document.clone(),
388 ctx: None,
389 _marker: PhantomData::<S>::default(),
390 };
391 keyspace.send(msg).await?;
392
393 self.task_service.mutation(Mutation::Put {
395 keyspace: Cow::Owned(keyspace.name().to_string()),
396 doc: document.clone(),
397 });
398
399 let factory = |node| {
400 let clock = self.node.clock().clone();
401 let keyspace = keyspace.name().to_string();
402 let document = document.clone();
403 async move {
404 let channel = self.node.network().get_or_connect(node);
405
406 let mut client = ConsistencyClient::<S>::new(clock, channel);
407
408 client
409 .put(
410 keyspace,
411 document,
412 self.node.me().node_id,
413 self.node.me().public_addr,
414 )
415 .await
416 .map_err(|e| StoreError::RpcError(node, e))?;
417
418 Ok::<_, StoreError<S::Error>>(())
419 }
420 };
421
422 handle_consistency_distribution::<S, _, _>(nodes, factory).await
423 }
424
425 pub async fn put_many<I, T, D>(
427 &self,
428 keyspace: &str,
429 documents: I,
430 consistency: Consistency,
431 ) -> Result<(), StoreError<S::Error>>
432 where
433 D: Into<Vec<u8>>,
434 T: Iterator<Item = (Key, D)> + Send,
435 I: IntoIterator<IntoIter = T> + Send,
436 {
437 let nodes = self
438 .node
439 .select_nodes(consistency)
440 .await
441 .map_err(StoreError::ConsistencyError)?;
442
443 let last_updated = self.node.clock().get_time().await;
444 let docs = documents
445 .into_iter()
446 .map(|(id, data)| Document::new(id, last_updated, data))
447 .collect::<DocVec<_>>();
448
449 let keyspace = self.group.get_or_create_keyspace(keyspace).await;
450 let msg = MultiSet {
451 source: CONSISTENCY_SOURCE_ID,
452 docs: docs.clone(),
453 ctx: None,
454 _marker: PhantomData::<S>::default(),
455 };
456 keyspace.send(msg).await?;
457
458 self.task_service.mutation(Mutation::MultiPut {
460 keyspace: Cow::Owned(keyspace.name().to_string()),
461 docs: docs.clone(),
462 });
463
464 let factory = |node| {
465 let clock = self.node.clock().clone();
466 let keyspace = keyspace.name().to_string();
467 let documents = docs.clone();
468 let self_member = self.node.me().clone();
469 async move {
470 let channel = self.node.network().get_or_connect(node);
471
472 let mut client = ConsistencyClient::<S>::new(clock, channel);
473
474 client
475 .multi_put(
476 keyspace,
477 documents.into_iter(),
478 self_member.node_id,
479 self_member.public_addr,
480 )
481 .await
482 .map_err(|e| StoreError::RpcError(node, e))?;
483
484 Ok::<_, StoreError<S::Error>>(())
485 }
486 };
487
488 handle_consistency_distribution::<S, _, _>(nodes, factory).await
489 }
490
491 pub async fn del(
493 &self,
494 keyspace: &str,
495 doc_id: Key,
496 consistency: Consistency,
497 ) -> Result<(), StoreError<S::Error>> {
498 let nodes = self
499 .node
500 .select_nodes(consistency)
501 .await
502 .map_err(StoreError::ConsistencyError)?;
503
504 let last_updated = self.node.clock().get_time().await;
505
506 let keyspace = self.group.get_or_create_keyspace(keyspace).await;
507 let doc = DocumentMetadata {
508 id: doc_id,
509 last_updated,
510 };
511 let msg = Del {
512 source: CONSISTENCY_SOURCE_ID,
513 doc,
514 _marker: PhantomData::<S>::default(),
515 };
516 keyspace.send(msg).await?;
517
518 self.task_service.mutation(Mutation::Del {
520 keyspace: Cow::Owned(keyspace.name().to_string()),
521 doc,
522 });
523
524 let factory = |node| {
525 let clock = self.node.clock().clone();
526 let keyspace = keyspace.name().to_string();
527 async move {
528 let channel = self.node.network().get_or_connect(node);
529
530 let mut client = ConsistencyClient::<S>::new(clock, channel);
531
532 client
533 .del(keyspace, doc_id, last_updated)
534 .await
535 .map_err(|e| StoreError::RpcError(node, e))?;
536
537 Ok::<_, StoreError<S::Error>>(())
538 }
539 };
540
541 handle_consistency_distribution::<S, _, _>(nodes, factory).await
542 }
543
544 pub async fn del_many<I, T>(
546 &self,
547 keyspace: &str,
548 doc_ids: I,
549 consistency: Consistency,
550 ) -> Result<(), StoreError<S::Error>>
551 where
552 T: Iterator<Item = Key> + Send,
553 I: IntoIterator<IntoIter = T> + Send,
554 {
555 let nodes = self
556 .node
557 .select_nodes(consistency)
558 .await
559 .map_err(StoreError::ConsistencyError)?;
560
561 let last_updated = self.node.clock().get_time().await;
562 let docs = doc_ids
563 .into_iter()
564 .map(|id| DocumentMetadata { id, last_updated })
565 .collect::<DocVec<_>>();
566
567 let keyspace = self.group.get_or_create_keyspace(keyspace).await;
568 let msg = MultiDel {
569 source: CONSISTENCY_SOURCE_ID,
570 docs: docs.clone(),
571 _marker: PhantomData::<S>::default(),
572 };
573 keyspace.send(msg).await?;
574
575 self.task_service.mutation(Mutation::MultiDel {
577 keyspace: Cow::Owned(keyspace.name().to_string()),
578 docs: docs.clone(),
579 });
580
581 let factory = |node| {
582 let clock = self.node.clock().clone();
583 let keyspace = keyspace.name().to_string();
584 let docs = docs.clone();
585 async move {
586 let channel = self.node.network().get_or_connect(node);
587
588 let mut client = ConsistencyClient::<S>::new(clock, channel);
589
590 client
591 .multi_del(keyspace, docs)
592 .await
593 .map_err(|e| StoreError::RpcError(node, e))?;
594
595 Ok::<_, StoreError<S::Error>>(())
596 }
597 };
598
599 handle_consistency_distribution::<S, _, _>(nodes, factory).await
600 }
601}
602
603pub struct ReplicatorKeyspaceHandle<S>
605where
606 S: Storage,
607{
608 inner: ReplicatedStoreHandle<S>,
609 keyspace: Cow<'static, str>,
610}
611
612impl<S> Clone for ReplicatorKeyspaceHandle<S>
613where
614 S: Storage,
615{
616 fn clone(&self) -> Self {
617 Self {
618 inner: self.inner.clone(),
619 keyspace: self.keyspace.clone(),
620 }
621 }
622}
623
624impl<S> ReplicatorKeyspaceHandle<S>
625where
626 S: Storage,
627{
628 pub async fn get(&self, doc_id: Key) -> Result<Option<Document>, S::Error> {
630 self.inner.get(self.keyspace.as_ref(), doc_id).await
631 }
632
633 pub async fn get_many<I, T>(&self, doc_ids: I) -> Result<S::DocsIter, S::Error>
638 where
639 T: Iterator<Item = Key> + Send,
640 I: IntoIterator<IntoIter = T> + Send,
641 {
642 self.inner.get_many(self.keyspace.as_ref(), doc_ids).await
643 }
644
645 pub async fn put(
647 &self,
648 doc_id: Key,
649 data: Vec<u8>,
650 consistency: Consistency,
651 ) -> Result<(), StoreError<S::Error>> {
652 self.inner
653 .put(self.keyspace.as_ref(), doc_id, data, consistency)
654 .await
655 }
656
657 pub async fn put_many<I, T>(
659 &self,
660 documents: I,
661 consistency: Consistency,
662 ) -> Result<(), StoreError<S::Error>>
663 where
664 T: Iterator<Item = (Key, Vec<u8>)> + Send,
665 I: IntoIterator<IntoIter = T> + Send,
666 {
667 self.inner
668 .put_many(self.keyspace.as_ref(), documents, consistency)
669 .await
670 }
671
672 pub async fn del(
674 &self,
675 doc_id: Key,
676 consistency: Consistency,
677 ) -> Result<(), StoreError<S::Error>> {
678 self.inner
679 .del(self.keyspace.as_ref(), doc_id, consistency)
680 .await
681 }
682
683 pub async fn del_many<I, T>(
685 &self,
686 doc_ids: I,
687 consistency: Consistency,
688 ) -> Result<(), StoreError<S::Error>>
689 where
690 T: Iterator<Item = Key> + Send,
691 I: IntoIterator<IntoIter = T> + Send,
692 {
693 self.inner
694 .del_many(self.keyspace.as_ref(), doc_ids, consistency)
695 .await
696 }
697}
698
699async fn watch_membership_changes(
703 task_service: TaskDistributor,
704 repair_service: ReplicationHandle,
705 node_handle: DatacakeHandle,
706) {
707 let mut changes = node_handle.membership_changes();
708 while let Some(members) = changes.next().await {
709 task_service.membership_change(members.clone());
710 repair_service.membership_change(members.clone());
711 }
712}
713
714async fn handle_consistency_distribution<S, CB, F>(
715 nodes: Nodes,
716 factory: CB,
717) -> Result<(), StoreError<S::Error>>
718where
719 S: Storage,
720 CB: FnMut(SocketAddr) -> F,
721 F: Future<Output = Result<(), StoreError<S::Error>>>,
722{
723 let mut num_success = 0;
724 let num_required = nodes.len();
725
726 let mut requests = nodes
727 .into_iter()
728 .map(factory)
729 .collect::<FuturesUnordered<_>>();
730
731 while let Some(res) = requests.next().await {
732 match res {
733 Ok(()) => {
734 num_success += 1;
735 },
736 Err(StoreError::RpcError(node, error)) => {
737 error!(
738 error = ?error,
739 target_node = %node,
740 "Replica failed to acknowledge change to meet consistency level requirement."
741 );
742 },
743 Err(StoreError::TransportError(node, error)) => {
744 error!(
745 error = ?error,
746 target_node = %node,
747 "Replica failed to acknowledge change to meet consistency level requirement."
748 );
749 },
750 Err(other) => {
751 error!(
752 error = ?other,
753 "Failed to send action to replica due to unknown error.",
754 );
755 },
756 }
757 }
758
759 if num_success != num_required {
760 Err(StoreError::ConsistencyError(
761 ConsistencyError::ConsistencyFailure {
762 responses: num_success,
763 required: num_required,
764 timeout: TIMEOUT,
765 },
766 ))
767 } else {
768 Ok(())
769 }
770}