Struct cnr::Replica [−][src]
Expand description
An instance of a replicated data structure. Uses one or more shared logs to scale operations on the data structure across cores and processors.
Takes in one type argument: D
represents the underlying concurrent data
structure. D
must implement the Dispatch
trait.
A thread can be registered against the replica by calling register()
. A
mutable operation can be issued by calling execute_mut()
(immutable uses
execute
). A mutable operation will be eventually executed against the replica
along with any operations that were received on other replicas that share
the same underlying log.
Implementations
Constructs an instance of a replicated data structure.
Takes references to all the shared logs as an argument. The Logs are assumed to outlive the replica. The replica is bound to the log’s lifetime.
Example
use cnr::Dispatch; use cnr::Log; use cnr::LogMapper; use cnr::Replica; use core::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; // The data structure we want replicated. #[derive(Default)] struct Data { junk: AtomicUsize, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpWr(pub usize); impl LogMapper for OpWr { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpRd(()); impl LogMapper for OpRd { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } // This trait allows the `Data` to be used with node-replication. impl Dispatch for Data { type ReadOperation = OpRd; type WriteOperation = OpWr; type Response = Option<usize>; // A read returns the underlying u64. fn dispatch( &self, _op: Self::ReadOperation, ) -> Self::Response { Some(self.junk.load(Ordering::Relaxed)) } // A write updates the underlying u64. fn dispatch_mut( &self, op: Self::WriteOperation, ) -> Self::Response { self.junk.store(op.0, Ordering::Relaxed); None } } // Create one or more logs. let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default()); // Create a replica that uses the above log. let replica = Replica::<Data>::new(vec![log]);
Similar to Replica<D>::new
, but we pass a pre-initialized
data-structure as an argument (d
) rather than relying on the
Default trait to create one.
Note
Replica<D>::new
should be the preferred method to create a Replica.
If with_data
is used, care must be taken that the same state is passed
to every Replica object. If not the resulting operations executed
against replicas may not give deterministic results.
Registers a thread with this replica. Returns an idx inside an Option if the registration was successfull. None if the registration failed.
Example
use cnr::Dispatch; use cnr::Log; use cnr::LogMapper; use cnr::Replica; use core::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; #[derive(Default)] struct Data { junk: AtomicUsize, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpWr(pub usize); impl LogMapper for OpWr { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpRd(()); impl LogMapper for OpRd { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } impl Dispatch for Data { type ReadOperation = OpRd; type WriteOperation = OpWr; type Response = Option<usize>; fn dispatch( &self, _op: Self::ReadOperation, ) -> Self::Response { Some(self.junk.load(Ordering::Relaxed)) } fn dispatch_mut( &self, op: Self::WriteOperation, ) -> Self::Response { self.junk.store(op.0, Ordering::Relaxed); None } } let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default()); let replica = Replica::<Data>::new(vec![log]); // Calling register() returns an idx that can be used to execute // operations against the replica. let idx = replica.register().expect("Failed to register with replica.");
pub fn execute_mut(
&self,
op: <D as Dispatch>::WriteOperation,
idx: ReplicaToken
) -> <D as Dispatch>::Response
pub fn execute_mut(
&self,
op: <D as Dispatch>::WriteOperation,
idx: ReplicaToken
) -> <D as Dispatch>::Response
Executes an mutable operation against this replica and returns a response.
idx
is an identifier for the thread performing the execute operation.
Example
use cnr::Dispatch; use cnr::Log; use cnr::LogMapper; use cnr::Replica; use core::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; #[derive(Default)] struct Data { junk: AtomicUsize, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpWr(pub usize); impl LogMapper for OpWr { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpRd(()); impl LogMapper for OpRd { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } impl Dispatch for Data { type ReadOperation = OpRd; type WriteOperation = OpWr; type Response = Option<usize>; fn dispatch( &self, _op: Self::ReadOperation, ) -> Self::Response { Some(self.junk.load(Ordering::Relaxed)) } fn dispatch_mut( &self, op: Self::WriteOperation, ) -> Self::Response { self.junk.store(op.0, Ordering::Relaxed); None } } let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default()); let replica = Replica::<Data>::new(vec![log]); let idx = replica.register().expect("Failed to register with replica."); // execute_mut() can be used to write to the replicated data structure. let res = replica.execute_mut(OpWr(100), idx); assert_eq!(None, res);
pub fn execute_mut_scan(
&self,
op: <D as Dispatch>::WriteOperation,
idx: ReplicaToken
) -> <D as Dispatch>::Response
pub fn execute_mut_scan(
&self,
op: <D as Dispatch>::WriteOperation,
idx: ReplicaToken
) -> <D as Dispatch>::Response
This method executes an mutable operation against this replica that depends on multiple logs and returns a response.
idx
is an identifier for the thread performing the execute operation.
Example
use cnr::Dispatch; use cnr::Log; use cnr::LogMapper; use cnr::Replica; use core::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; #[derive(Default)] struct Data { junk: AtomicUsize, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpWr(pub usize); impl LogMapper for OpWr { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpRd(()); impl LogMapper for OpRd { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } impl Dispatch for Data { type ReadOperation = OpRd; type WriteOperation = OpWr; type Response = Option<usize>; fn dispatch( &self, _op: Self::ReadOperation, ) -> Self::Response { Some(self.junk.load(Ordering::Relaxed)) } fn dispatch_mut( &self, op: Self::WriteOperation, ) -> Self::Response { self.junk.store(op.0, Ordering::Relaxed); Some(op.0) } } let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default()); let replica = Replica::<Data>::new(vec![log]); let idx = replica.register().expect("Failed to register with replica."); // execute_mut_scan() can be used to write to the replicated data structure // through all the logs. let res = replica.execute_mut_scan(OpWr(100), idx); assert_eq!(Some(100), res);
pub fn execute(
&self,
op: <D as Dispatch>::ReadOperation,
idx: ReplicaToken
) -> <D as Dispatch>::Response
pub fn execute(
&self,
op: <D as Dispatch>::ReadOperation,
idx: ReplicaToken
) -> <D as Dispatch>::Response
Executes a read-only operation against this replica and returns a response.
idx
is an identifier for the thread performing the execute operation.
Example
use cnr::Dispatch; use cnr::Log; use cnr::LogMapper; use cnr::Replica; use core::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; #[derive(Default)] struct Data { junk: AtomicUsize, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpWr(pub usize); impl LogMapper for OpWr { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpRd(()); impl LogMapper for OpRd { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } impl Dispatch for Data { type ReadOperation = OpRd; type WriteOperation = OpWr; type Response = Option<usize>; fn dispatch( &self, _op: Self::ReadOperation, ) -> Self::Response { Some(self.junk.load(Ordering::Relaxed)) } fn dispatch_mut( &self, op: Self::WriteOperation, ) -> Self::Response { self.junk.store(op.0, Ordering::Relaxed); None } } let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default()); let replica = Replica::<Data>::new(vec![log]); let idx = replica.register().expect("Failed to register with replica."); let _wr = replica.execute_mut(OpWr(100), idx); // execute() can be used to read from the replicated data structure. let res = replica.execute(OpRd(()), idx); assert_eq!(Some(100), res);
pub fn execute_scan(
&self,
op: <D as Dispatch>::WriteOperation,
idx: ReplicaToken
) -> <D as Dispatch>::Response
pub fn execute_scan(
&self,
op: <D as Dispatch>::WriteOperation,
idx: ReplicaToken
) -> <D as Dispatch>::Response
This method executes an mutable operation against this replica that depends on multiple logs and returns a response.
idx
is an identifier for the thread performing the execute operation.
Example
use cnr::Dispatch; use cnr::Log; use cnr::LogMapper; use cnr::Replica; use core::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; #[derive(Default)] struct Data { junk: AtomicUsize, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpWr(pub usize); impl LogMapper for OpWr { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub struct OpRd(()); impl LogMapper for OpRd { // Only one log used for the example, hence returning 0. fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>) { logs.clear(); logs.push(0); } } impl Dispatch for Data { type ReadOperation = OpRd; type WriteOperation = OpWr; type Response = Option<usize>; fn dispatch( &self, _op: Self::ReadOperation, ) -> Self::Response { Some(self.junk.load(Ordering::Relaxed)) } fn dispatch_mut( &self, op: Self::WriteOperation, ) -> Self::Response { self.junk.store(op.0, Ordering::Relaxed); Some(op.0) } } let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default()); let replica = Replica::<Data>::new(vec![log]); let idx = replica.register().expect("Failed to register with replica."); // execute_mut_scan() can be used to write to the replicated data structure // through all the logs. let res = replica.execute_mut_scan(OpWr(100), idx); assert_eq!(Some(100), res);
This method is useful when a replica stops making progress and some threads on another replica are still active. The active replica will use all the entries in the log and won’t be able perform garbage collection because of the inactive replica. So, this method syncs up the replica against the underlying log.
This method is useful to when a replica stops consuming a particular log and some other replica is still using this a log.
No need to run in a loop because the replica will be synced for log_id if there is an active combiner.
Trait Implementations
The Replica is Sync. Member variables are protected by a CAS on combiner
.
Contexts are thread-safe.
Auto Trait Implementations
impl<'a, D> !RefUnwindSafe for Replica<'a, D>
impl<'a, D> Unpin for Replica<'a, D> where
D: Unpin,
<D as Dispatch>::Response: Unpin,
<D as Dispatch>::WriteOperation: Unpin,
impl<'a, D> !UnwindSafe for Replica<'a, D>