datacake_eventual_consistency/
lib.rs

1//! # Datacake Cluster
2//! A batteries included library for building your own distributed data stores or replicated state.
3//!
4//! This library is largely based on the same concepts as Riak and Cassandra. Consensus, membership and failure
5//! detection are managed by [Quickwit's Chitchat](https://github.com/quickwit-oss/chitchat) while state alignment
6//! and replication is managed by [Datacake CRDT](https://github.com/lnx-search/datacake/tree/main/datacake-crdt).
7//!
8//! RPC is provided and managed entirely within Datacake using [Tonic](https://crates.io/crates/tonic) and GRPC.
9//!
10//! This library is focused around providing a simple and easy to build framework for your distributed apps without
11//! being overwhelming. In fact, you can be up and running just by implementing 2 async traits.
12//!
13//! ## Basic Example
14//!
15//! ```rust
16//! use std::net::SocketAddr;
17//! use datacake_node::{Consistency, ConnectionConfig, DCAwareSelector, DatacakeNodeBuilder};
18//! use datacake_eventual_consistency::test_utils::MemStore;
19//! use datacake_eventual_consistency::EventuallyConsistentStoreExtension;
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//!     let addr = "127.0.0.1:8080".parse::<SocketAddr>().unwrap();
24//!     let connection_cfg = ConnectionConfig::new(addr, addr, Vec::<String>::new());
25//!     let node = DatacakeNodeBuilder::<DCAwareSelector>::new(1, connection_cfg)
26//!         .connect()
27//!         .await
28//!         .expect("Connect node.");
29//!
30//!     let store = node
31//!         .add_extension(EventuallyConsistentStoreExtension::new(MemStore::default()))
32//!         .await
33//!         .expect("Create store.");
34//!     
35//!     let handle = store.handle();
36//!
37//!     handle
38//!         .put(
39//!             "my-keyspace",
40//!             1,
41//!             b"Hello, world! From keyspace 1.".to_vec(),
42//!             Consistency::All,
43//!         )
44//!         .await
45//!         .expect("Put doc.");
46//!     
47//!     Ok(())
48//! }
49//! ```
50//!
51//! ## Complete Examples
52//! Indepth examples [can be found here](https://github.com/lnx-search/datacake/tree/main/examples).
53
54#[macro_use]
55extern crate tracing;
56
57mod core;
58mod error;
59mod keyspace;
60mod replication;
61mod rpc;
62mod statistics;
63mod storage;
64#[cfg(any(test, feature = "test-utils"))]
65pub mod test_utils;
66
67use std::borrow::Cow;
68use std::future::Future;
69use std::marker::PhantomData;
70use std::net::SocketAddr;
71use std::sync::Arc;
72use std::time::Duration;
73
74use async_trait::async_trait;
75use datacake_crdt::Key;
76use datacake_node::{
77    ClusterExtension,
78    Consistency,
79    ConsistencyError,
80    DatacakeHandle,
81    DatacakeNode,
82    Nodes,
83};
84pub use error::StoreError;
85use futures::stream::FuturesUnordered;
86use futures::StreamExt;
87pub use statistics::SystemStatistics;
88#[cfg(any(feature = "test-utils", feature = "test-suite"))]
89pub use storage::test_suite;
90pub use storage::{BulkMutationError, ProgressTracker, PutContext, Storage};
91
92pub use self::core::{Document, DocumentMetadata};
93use crate::core::DocVec;
94use crate::keyspace::{
95    Del,
96    KeyspaceGroup,
97    MultiDel,
98    MultiSet,
99    Set,
100    CONSISTENCY_SOURCE_ID,
101};
102use crate::replication::{
103    Mutation,
104    ReplicationCycleContext,
105    ReplicationHandle,
106    TaskDistributor,
107    TaskServiceContext,
108};
109use crate::rpc::services::consistency_impl::ConsistencyService;
110use crate::rpc::services::replication_impl::ReplicationService;
111use crate::rpc::ConsistencyClient;
112
113const TIMEOUT: Duration = Duration::from_secs(2);
114const DEFAULT_REPAIR_INTERVAL: Duration = if cfg!(any(test, feature = "test-utils")) {
115    Duration::from_secs(1)
116} else {
117    Duration::from_secs(60 * 60) // 1 Hour
118};
119
120/// A fully managed eventually consistent state controller.
121///
122/// The [EventuallyConsistentStore] manages all RPC and state propagation for
123/// a given application, where the only setup required is the
124/// RPC based configuration and the required handler traits
125/// which wrap the application itself.
126///
127/// Datacake essentially acts as a frontend wrapper around a datastore
128/// to make is distributed.
129pub struct EventuallyConsistentStoreExtension<S>
130where
131    S: Storage,
132{
133    datastore: S,
134    repair_interval: Duration,
135}
136
137impl<S> EventuallyConsistentStoreExtension<S>
138where
139    S: Storage,
140{
141    /// Creates a new extension with a given data store, using the default repair
142    /// interval.
143    pub fn new(store: S) -> Self {
144        Self {
145            datastore: store,
146            repair_interval: DEFAULT_REPAIR_INTERVAL,
147        }
148    }
149
150    /// Set a custom repair interval rather than the default (1 hour.)
151    pub fn with_repair_interval(mut self, dur: Duration) -> Self {
152        self.repair_interval = dur;
153        self
154    }
155}
156
157#[async_trait]
158impl<S> ClusterExtension for EventuallyConsistentStoreExtension<S>
159where
160    S: Storage,
161{
162    type Output = EventuallyConsistentStore<S>;
163    type Error = StoreError<S::Error>;
164
165    async fn init_extension(
166        self,
167        node: &DatacakeNode,
168    ) -> Result<Self::Output, Self::Error> {
169        EventuallyConsistentStore::create(self.datastore, self.repair_interval, node)
170            .await
171    }
172}
173
174/// A fully managed eventually consistent state controller.
175///
176/// The [EventuallyConsistentStore] manages all RPC and state propagation for
177/// a given application, where the only setup required is the
178/// RPC based configuration and the required handler traits
179/// which wrap the application itself.
180///
181/// Datacake essentially acts as a frontend wrapper around a datastore
182/// to make is distributed.
183pub struct EventuallyConsistentStore<S>
184where
185    S: Storage,
186{
187    node: DatacakeHandle,
188    group: KeyspaceGroup<S>,
189    task_service: TaskDistributor,
190    repair_service: ReplicationHandle,
191    statistics: SystemStatistics,
192}
193
194impl<S> EventuallyConsistentStore<S>
195where
196    S: Storage,
197{
198    async fn create(
199        datastore: S,
200        repair_interval: Duration,
201        node: &DatacakeNode,
202    ) -> Result<Self, StoreError<S::Error>> {
203        let storage = Arc::new(datastore);
204
205        let group = KeyspaceGroup::new(storage.clone(), node.clock().clone()).await;
206        let statistics = SystemStatistics::default();
207
208        // Load the keyspace states.
209        group.load_states_from_storage().await?;
210
211        let task_ctx = TaskServiceContext {
212            clock: node.clock().clone(),
213            network: node.network().clone(),
214            local_node_id: node.me().node_id,
215            public_node_addr: node.me().public_addr,
216        };
217        let replication_ctx = ReplicationCycleContext {
218            repair_interval,
219            group: group.clone(),
220            network: node.network().clone(),
221        };
222        let task_service =
223            replication::start_task_distributor_service::<S>(task_ctx).await;
224        let repair_service = replication::start_replication_cycle(replication_ctx).await;
225
226        tokio::spawn(watch_membership_changes(
227            task_service.clone(),
228            repair_service.clone(),
229            node.handle(),
230        ));
231
232        node.add_rpc_service(ConsistencyService::new(
233            group.clone(),
234            node.network().clone(),
235        ));
236        node.add_rpc_service(ReplicationService::new(group.clone()));
237
238        Ok(Self {
239            node: node.handle(),
240            group,
241            statistics,
242            task_service,
243            repair_service,
244        })
245    }
246
247    #[inline]
248    /// Gets the live cluster statistics.
249    pub fn statistics(&self) -> &SystemStatistics {
250        &self.statistics
251    }
252
253    /// Creates a new handle to the underlying storage system.
254    ///
255    /// Changes applied to the handle are distributed across the cluster.
256    pub fn handle(&self) -> ReplicatedStoreHandle<S> {
257        ReplicatedStoreHandle {
258            node: self.node.clone(),
259            task_service: self.task_service.clone(),
260            statistics: self.statistics.clone(),
261            group: self.group.clone(),
262        }
263    }
264
265    /// Creates a new handle to the underlying storage system with a preset keyspace.
266    ///
267    /// Changes applied to the handle are distributed across the cluster.
268    pub fn handle_with_keyspace(
269        &self,
270        keyspace: impl Into<String>,
271    ) -> ReplicatorKeyspaceHandle<S> {
272        ReplicatorKeyspaceHandle {
273            inner: self.handle(),
274            keyspace: Cow::Owned(keyspace.into()),
275        }
276    }
277}
278
279impl<S> Drop for EventuallyConsistentStore<S>
280where
281    S: Storage,
282{
283    fn drop(&mut self) {
284        self.task_service.kill();
285        self.repair_service.kill();
286    }
287}
288
289/// A cheaply cloneable handle to control the data store.
290pub struct ReplicatedStoreHandle<S>
291where
292    S: Storage,
293{
294    node: DatacakeHandle,
295    group: KeyspaceGroup<S>,
296    task_service: TaskDistributor,
297    statistics: SystemStatistics,
298}
299
300impl<S> Clone for ReplicatedStoreHandle<S>
301where
302    S: Storage,
303{
304    fn clone(&self) -> Self {
305        Self {
306            node: self.node.clone(),
307            group: self.group.clone(),
308            task_service: self.task_service.clone(),
309            statistics: self.statistics.clone(),
310        }
311    }
312}
313
314impl<S> ReplicatedStoreHandle<S>
315where
316    S: Storage,
317{
318    #[inline]
319    /// Gets the live cluster statistics.
320    pub fn statistics(&self) -> &SystemStatistics {
321        &self.statistics
322    }
323
324    /// Creates a new handle to the underlying storage system with a preset keyspace.
325    ///
326    /// Changes applied to the handle are distributed across the cluster.
327    pub fn with_keyspace(
328        &self,
329        keyspace: impl Into<String>,
330    ) -> ReplicatorKeyspaceHandle<S> {
331        ReplicatorKeyspaceHandle {
332            inner: self.clone(),
333            keyspace: Cow::Owned(keyspace.into()),
334        }
335    }
336
337    /// Retrieves a document from the underlying storage.
338    pub async fn get(
339        &self,
340        keyspace: &str,
341        doc_id: Key,
342    ) -> Result<Option<Document>, S::Error> {
343        let storage = self.group.storage();
344        storage.get(keyspace, doc_id).await
345    }
346
347    /// Retrieves a set of documents from the underlying storage.
348    ///
349    /// If a document does not exist with the given ID, it is simply not part
350    /// of the returned iterator.
351    pub async fn get_many<I, T>(
352        &self,
353        keyspace: &str,
354        doc_ids: I,
355    ) -> Result<S::DocsIter, S::Error>
356    where
357        T: Iterator<Item = Key> + Send,
358        I: IntoIterator<IntoIter = T> + Send,
359    {
360        let storage = self.group.storage();
361        storage.multi_get(keyspace, doc_ids.into_iter()).await
362    }
363
364    /// Insert or update a single document into the datastore.
365    pub async fn put<D>(
366        &self,
367        keyspace: &str,
368        doc_id: Key,
369        data: D,
370        consistency: Consistency,
371    ) -> Result<(), StoreError<S::Error>>
372    where
373        D: Into<Vec<u8>>,
374    {
375        let nodes = self
376            .node
377            .select_nodes(consistency)
378            .await
379            .map_err(StoreError::ConsistencyError)?;
380
381        let last_updated = self.node.clock().get_time().await;
382        let document = Document::new(doc_id, last_updated, data);
383
384        let keyspace = self.group.get_or_create_keyspace(keyspace).await;
385        let msg = Set {
386            source: CONSISTENCY_SOURCE_ID,
387            doc: document.clone(),
388            ctx: None,
389            _marker: PhantomData::<S>::default(),
390        };
391        keyspace.send(msg).await?;
392
393        // Register mutation with the distributor service.
394        self.task_service.mutation(Mutation::Put {
395            keyspace: Cow::Owned(keyspace.name().to_string()),
396            doc: document.clone(),
397        });
398
399        let factory = |node| {
400            let clock = self.node.clock().clone();
401            let keyspace = keyspace.name().to_string();
402            let document = document.clone();
403            async move {
404                let channel = self.node.network().get_or_connect(node);
405
406                let mut client = ConsistencyClient::<S>::new(clock, channel);
407
408                client
409                    .put(
410                        keyspace,
411                        document,
412                        self.node.me().node_id,
413                        self.node.me().public_addr,
414                    )
415                    .await
416                    .map_err(|e| StoreError::RpcError(node, e))?;
417
418                Ok::<_, StoreError<S::Error>>(())
419            }
420        };
421
422        handle_consistency_distribution::<S, _, _>(nodes, factory).await
423    }
424
425    /// Insert or update multiple documents into the datastore at once.
426    pub async fn put_many<I, T, D>(
427        &self,
428        keyspace: &str,
429        documents: I,
430        consistency: Consistency,
431    ) -> Result<(), StoreError<S::Error>>
432    where
433        D: Into<Vec<u8>>,
434        T: Iterator<Item = (Key, D)> + Send,
435        I: IntoIterator<IntoIter = T> + Send,
436    {
437        let nodes = self
438            .node
439            .select_nodes(consistency)
440            .await
441            .map_err(StoreError::ConsistencyError)?;
442
443        let last_updated = self.node.clock().get_time().await;
444        let docs = documents
445            .into_iter()
446            .map(|(id, data)| Document::new(id, last_updated, data))
447            .collect::<DocVec<_>>();
448
449        let keyspace = self.group.get_or_create_keyspace(keyspace).await;
450        let msg = MultiSet {
451            source: CONSISTENCY_SOURCE_ID,
452            docs: docs.clone(),
453            ctx: None,
454            _marker: PhantomData::<S>::default(),
455        };
456        keyspace.send(msg).await?;
457
458        // Register mutation with the distributor service.
459        self.task_service.mutation(Mutation::MultiPut {
460            keyspace: Cow::Owned(keyspace.name().to_string()),
461            docs: docs.clone(),
462        });
463
464        let factory = |node| {
465            let clock = self.node.clock().clone();
466            let keyspace = keyspace.name().to_string();
467            let documents = docs.clone();
468            let self_member = self.node.me().clone();
469            async move {
470                let channel = self.node.network().get_or_connect(node);
471
472                let mut client = ConsistencyClient::<S>::new(clock, channel);
473
474                client
475                    .multi_put(
476                        keyspace,
477                        documents.into_iter(),
478                        self_member.node_id,
479                        self_member.public_addr,
480                    )
481                    .await
482                    .map_err(|e| StoreError::RpcError(node, e))?;
483
484                Ok::<_, StoreError<S::Error>>(())
485            }
486        };
487
488        handle_consistency_distribution::<S, _, _>(nodes, factory).await
489    }
490
491    /// Delete a document from the datastore with a given doc ID.
492    pub async fn del(
493        &self,
494        keyspace: &str,
495        doc_id: Key,
496        consistency: Consistency,
497    ) -> Result<(), StoreError<S::Error>> {
498        let nodes = self
499            .node
500            .select_nodes(consistency)
501            .await
502            .map_err(StoreError::ConsistencyError)?;
503
504        let last_updated = self.node.clock().get_time().await;
505
506        let keyspace = self.group.get_or_create_keyspace(keyspace).await;
507        let doc = DocumentMetadata {
508            id: doc_id,
509            last_updated,
510        };
511        let msg = Del {
512            source: CONSISTENCY_SOURCE_ID,
513            doc,
514            _marker: PhantomData::<S>::default(),
515        };
516        keyspace.send(msg).await?;
517
518        // Register mutation with the distributor service.
519        self.task_service.mutation(Mutation::Del {
520            keyspace: Cow::Owned(keyspace.name().to_string()),
521            doc,
522        });
523
524        let factory = |node| {
525            let clock = self.node.clock().clone();
526            let keyspace = keyspace.name().to_string();
527            async move {
528                let channel = self.node.network().get_or_connect(node);
529
530                let mut client = ConsistencyClient::<S>::new(clock, channel);
531
532                client
533                    .del(keyspace, doc_id, last_updated)
534                    .await
535                    .map_err(|e| StoreError::RpcError(node, e))?;
536
537                Ok::<_, StoreError<S::Error>>(())
538            }
539        };
540
541        handle_consistency_distribution::<S, _, _>(nodes, factory).await
542    }
543
544    /// Delete multiple documents from the datastore from the set of doc IDs.
545    pub async fn del_many<I, T>(
546        &self,
547        keyspace: &str,
548        doc_ids: I,
549        consistency: Consistency,
550    ) -> Result<(), StoreError<S::Error>>
551    where
552        T: Iterator<Item = Key> + Send,
553        I: IntoIterator<IntoIter = T> + Send,
554    {
555        let nodes = self
556            .node
557            .select_nodes(consistency)
558            .await
559            .map_err(StoreError::ConsistencyError)?;
560
561        let last_updated = self.node.clock().get_time().await;
562        let docs = doc_ids
563            .into_iter()
564            .map(|id| DocumentMetadata { id, last_updated })
565            .collect::<DocVec<_>>();
566
567        let keyspace = self.group.get_or_create_keyspace(keyspace).await;
568        let msg = MultiDel {
569            source: CONSISTENCY_SOURCE_ID,
570            docs: docs.clone(),
571            _marker: PhantomData::<S>::default(),
572        };
573        keyspace.send(msg).await?;
574
575        // Register mutation with the distributor service.
576        self.task_service.mutation(Mutation::MultiDel {
577            keyspace: Cow::Owned(keyspace.name().to_string()),
578            docs: docs.clone(),
579        });
580
581        let factory = |node| {
582            let clock = self.node.clock().clone();
583            let keyspace = keyspace.name().to_string();
584            let docs = docs.clone();
585            async move {
586                let channel = self.node.network().get_or_connect(node);
587
588                let mut client = ConsistencyClient::<S>::new(clock, channel);
589
590                client
591                    .multi_del(keyspace, docs)
592                    .await
593                    .map_err(|e| StoreError::RpcError(node, e))?;
594
595                Ok::<_, StoreError<S::Error>>(())
596            }
597        };
598
599        handle_consistency_distribution::<S, _, _>(nodes, factory).await
600    }
601}
602
603/// A convenience wrapper which creates a new handle with a preset keyspace.
604pub struct ReplicatorKeyspaceHandle<S>
605where
606    S: Storage,
607{
608    inner: ReplicatedStoreHandle<S>,
609    keyspace: Cow<'static, str>,
610}
611
612impl<S> Clone for ReplicatorKeyspaceHandle<S>
613where
614    S: Storage,
615{
616    fn clone(&self) -> Self {
617        Self {
618            inner: self.inner.clone(),
619            keyspace: self.keyspace.clone(),
620        }
621    }
622}
623
624impl<S> ReplicatorKeyspaceHandle<S>
625where
626    S: Storage,
627{
628    /// Retrieves a document from the underlying storage.
629    pub async fn get(&self, doc_id: Key) -> Result<Option<Document>, S::Error> {
630        self.inner.get(self.keyspace.as_ref(), doc_id).await
631    }
632
633    /// Retrieves a set of documents from the underlying storage.
634    ///
635    /// If a document does not exist with the given ID, it is simply not part
636    /// of the returned iterator.
637    pub async fn get_many<I, T>(&self, doc_ids: I) -> Result<S::DocsIter, S::Error>
638    where
639        T: Iterator<Item = Key> + Send,
640        I: IntoIterator<IntoIter = T> + Send,
641    {
642        self.inner.get_many(self.keyspace.as_ref(), doc_ids).await
643    }
644
645    /// Insert or update a single document into the datastore.
646    pub async fn put(
647        &self,
648        doc_id: Key,
649        data: Vec<u8>,
650        consistency: Consistency,
651    ) -> Result<(), StoreError<S::Error>> {
652        self.inner
653            .put(self.keyspace.as_ref(), doc_id, data, consistency)
654            .await
655    }
656
657    /// Insert or update multiple documents into the datastore at once.
658    pub async fn put_many<I, T>(
659        &self,
660        documents: I,
661        consistency: Consistency,
662    ) -> Result<(), StoreError<S::Error>>
663    where
664        T: Iterator<Item = (Key, Vec<u8>)> + Send,
665        I: IntoIterator<IntoIter = T> + Send,
666    {
667        self.inner
668            .put_many(self.keyspace.as_ref(), documents, consistency)
669            .await
670    }
671
672    /// Delete a document from the datastore with a given doc ID.
673    pub async fn del(
674        &self,
675        doc_id: Key,
676        consistency: Consistency,
677    ) -> Result<(), StoreError<S::Error>> {
678        self.inner
679            .del(self.keyspace.as_ref(), doc_id, consistency)
680            .await
681    }
682
683    /// Delete multiple documents from the datastore from the set of doc IDs.
684    pub async fn del_many<I, T>(
685        &self,
686        doc_ids: I,
687        consistency: Consistency,
688    ) -> Result<(), StoreError<S::Error>>
689    where
690        T: Iterator<Item = Key> + Send,
691        I: IntoIterator<IntoIter = T> + Send,
692    {
693        self.inner
694            .del_many(self.keyspace.as_ref(), doc_ids, consistency)
695            .await
696    }
697}
698
699/// Watches for changes in the cluster membership.
700///
701/// When nodes leave and join, pollers are stopped and started as required.
702async fn watch_membership_changes(
703    task_service: TaskDistributor,
704    repair_service: ReplicationHandle,
705    node_handle: DatacakeHandle,
706) {
707    let mut changes = node_handle.membership_changes();
708    while let Some(members) = changes.next().await {
709        task_service.membership_change(members.clone());
710        repair_service.membership_change(members.clone());
711    }
712}
713
714async fn handle_consistency_distribution<S, CB, F>(
715    nodes: Nodes,
716    factory: CB,
717) -> Result<(), StoreError<S::Error>>
718where
719    S: Storage,
720    CB: FnMut(SocketAddr) -> F,
721    F: Future<Output = Result<(), StoreError<S::Error>>>,
722{
723    let mut num_success = 0;
724    let num_required = nodes.len();
725
726    let mut requests = nodes
727        .into_iter()
728        .map(factory)
729        .collect::<FuturesUnordered<_>>();
730
731    while let Some(res) = requests.next().await {
732        match res {
733            Ok(()) => {
734                num_success += 1;
735            },
736            Err(StoreError::RpcError(node, error)) => {
737                error!(
738                    error = ?error,
739                    target_node = %node,
740                    "Replica failed to acknowledge change to meet consistency level requirement."
741                );
742            },
743            Err(StoreError::TransportError(node, error)) => {
744                error!(
745                    error = ?error,
746                    target_node = %node,
747                    "Replica failed to acknowledge change to meet consistency level requirement."
748                );
749            },
750            Err(other) => {
751                error!(
752                    error = ?other,
753                    "Failed to send action to replica due to unknown error.",
754                );
755            },
756        }
757    }
758
759    if num_success != num_required {
760        Err(StoreError::ConsistencyError(
761            ConsistencyError::ConsistencyFailure {
762                responses: num_success,
763                required: num_required,
764                timeout: TIMEOUT,
765            },
766        ))
767    } else {
768        Ok(())
769    }
770}