Struct cnr::Replica[][src]

pub struct Replica<'a, D> where
    D: Sized + Dispatch + Sync
{ /* fields omitted */ }
Expand description

An instance of a replicated data structure. Uses one or more shared logs to scale operations on the data structure across cores and processors.

Takes in one type argument: D represents the underlying concurrent data structure. D must implement the Dispatch trait.

A thread can be registered against the replica by calling register(). A mutable operation can be issued by calling execute_mut() (immutable uses execute). A mutable operation will be eventually executed against the replica along with any operations that were received on other replicas that share the same underlying log.

Implementations

Constructs an instance of a replicated data structure.

Takes references to all the shared logs as an argument. The Logs are assumed to outlive the replica. The replica is bound to the log’s lifetime.

Example

use cnr::Dispatch;
use cnr::Log;
use cnr::LogMapper;
use cnr::Replica;

use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

// The data structure we want replicated.
#[derive(Default)]
struct Data {
    junk: AtomicUsize,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpWr(pub usize);

impl LogMapper for OpWr {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpRd(());

impl LogMapper for OpRd {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

// This trait allows the `Data` to be used with node-replication.
impl Dispatch for Data {
    type ReadOperation = OpRd;
    type WriteOperation = OpWr;
    type Response = Option<usize>;

    // A read returns the underlying u64.
    fn dispatch(
        &self,
        _op: Self::ReadOperation,
    ) -> Self::Response {
        Some(self.junk.load(Ordering::Relaxed))
    }

    // A write updates the underlying u64.
    fn dispatch_mut(
        &self,
        op: Self::WriteOperation,
    ) -> Self::Response {
        self.junk.store(op.0, Ordering::Relaxed);
        None
    }
}

// Create one or more logs.
let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());

// Create a replica that uses the above log.
let replica = Replica::<Data>::new(vec![log]);

Similar to Replica<D>::new, but we pass a pre-initialized data-structure as an argument (d) rather than relying on the Default trait to create one.

Note

Replica<D>::new should be the preferred method to create a Replica. If with_data is used, care must be taken that the same state is passed to every Replica object. If not the resulting operations executed against replicas may not give deterministic results.

Registers a thread with this replica. Returns an idx inside an Option if the registration was successfull. None if the registration failed.

Example

use cnr::Dispatch;
use cnr::Log;
use cnr::LogMapper;
use cnr::Replica;

use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

#[derive(Default)]
struct Data {
    junk: AtomicUsize,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpWr(pub usize);

impl LogMapper for OpWr {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpRd(());

impl LogMapper for OpRd {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

impl Dispatch for Data {
    type ReadOperation = OpRd;
    type WriteOperation = OpWr;
    type Response = Option<usize>;

    fn dispatch(
        &self,
        _op: Self::ReadOperation,
    ) -> Self::Response {
        Some(self.junk.load(Ordering::Relaxed))
    }

    fn dispatch_mut(
        &self,
        op: Self::WriteOperation,
    ) -> Self::Response {
        self.junk.store(op.0, Ordering::Relaxed);
        None
    }
}

let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let replica = Replica::<Data>::new(vec![log]);

// Calling register() returns an idx that can be used to execute
// operations against the replica.
let idx = replica.register().expect("Failed to register with replica.");

Executes an mutable operation against this replica and returns a response. idx is an identifier for the thread performing the execute operation.

Example

use cnr::Dispatch;
use cnr::Log;
use cnr::LogMapper;
use cnr::Replica;

use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

#[derive(Default)]
struct Data {
    junk: AtomicUsize,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpWr(pub usize);

impl LogMapper for OpWr {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpRd(());

impl LogMapper for OpRd {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

impl Dispatch for Data {
    type ReadOperation = OpRd;
    type WriteOperation = OpWr;
    type Response = Option<usize>;

    fn dispatch(
        &self,
        _op: Self::ReadOperation,
    ) -> Self::Response {
        Some(self.junk.load(Ordering::Relaxed))
    }

    fn dispatch_mut(
        &self,
        op: Self::WriteOperation,
    ) -> Self::Response {
        self.junk.store(op.0, Ordering::Relaxed);
        None
    }
}

let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let replica = Replica::<Data>::new(vec![log]);
let idx = replica.register().expect("Failed to register with replica.");

// execute_mut() can be used to write to the replicated data structure.
let res = replica.execute_mut(OpWr(100), idx);
assert_eq!(None, res);

This method executes an mutable operation against this replica that depends on multiple logs and returns a response.

idx is an identifier for the thread performing the execute operation.

Example

use cnr::Dispatch;
use cnr::Log;
use cnr::LogMapper;
use cnr::Replica;

use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

#[derive(Default)]
struct Data {
    junk: AtomicUsize,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpWr(pub usize);

impl LogMapper for OpWr {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpRd(());

impl LogMapper for OpRd {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

impl Dispatch for Data {
    type ReadOperation = OpRd;
    type WriteOperation = OpWr;
    type Response = Option<usize>;

    fn dispatch(
        &self,
        _op: Self::ReadOperation,
    ) -> Self::Response {
        Some(self.junk.load(Ordering::Relaxed))
    }

    fn dispatch_mut(
        &self,
        op: Self::WriteOperation,
    ) -> Self::Response {
        self.junk.store(op.0, Ordering::Relaxed);
        Some(op.0)
    }
}

let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let replica = Replica::<Data>::new(vec![log]);
let idx = replica.register().expect("Failed to register with replica.");

// execute_mut_scan() can be used to write to the replicated data structure
// through all the logs.
let res = replica.execute_mut_scan(OpWr(100), idx);
assert_eq!(Some(100), res);

Executes a read-only operation against this replica and returns a response. idx is an identifier for the thread performing the execute operation.

Example

use cnr::Dispatch;
use cnr::Log;
use cnr::LogMapper;
use cnr::Replica;

use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

#[derive(Default)]
struct Data {
    junk: AtomicUsize,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpWr(pub usize);

impl LogMapper for OpWr {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpRd(());

impl LogMapper for OpRd {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

impl Dispatch for Data {
    type ReadOperation = OpRd;
    type WriteOperation = OpWr;
    type Response = Option<usize>;

    fn dispatch(
        &self,
        _op: Self::ReadOperation,
    ) -> Self::Response {
        Some(self.junk.load(Ordering::Relaxed))
    }

    fn dispatch_mut(
        &self,
        op: Self::WriteOperation,
    ) -> Self::Response {
        self.junk.store(op.0, Ordering::Relaxed);
        None
    }
}

let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let replica = Replica::<Data>::new(vec![log]);
let idx = replica.register().expect("Failed to register with replica.");
let _wr = replica.execute_mut(OpWr(100), idx);

// execute() can be used to read from the replicated data structure.
let res = replica.execute(OpRd(()), idx);
assert_eq!(Some(100), res);

This method executes an mutable operation against this replica that depends on multiple logs and returns a response.

idx is an identifier for the thread performing the execute operation.

Example

use cnr::Dispatch;
use cnr::Log;
use cnr::LogMapper;
use cnr::Replica;

use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

#[derive(Default)]
struct Data {
    junk: AtomicUsize,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpWr(pub usize);

impl LogMapper for OpWr {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct OpRd(());

impl LogMapper for OpRd {
    // Only one log used for the example, hence returning 0.
    fn hash(&self, _nlogs:usize, logs: &mut Vec<usize>)
    {
        logs.clear();
        logs.push(0);
    }
}

impl Dispatch for Data {
    type ReadOperation = OpRd;
    type WriteOperation = OpWr;
    type Response = Option<usize>;

    fn dispatch(
        &self,
        _op: Self::ReadOperation,
    ) -> Self::Response {
        Some(self.junk.load(Ordering::Relaxed))
    }

    fn dispatch_mut(
        &self,
        op: Self::WriteOperation,
    ) -> Self::Response {
        self.junk.store(op.0, Ordering::Relaxed);
        Some(op.0)
    }
}

let log = Arc::new(Log::<<Data as Dispatch>::WriteOperation>::default());
let replica = Replica::<Data>::new(vec![log]);
let idx = replica.register().expect("Failed to register with replica.");

// execute_mut_scan() can be used to write to the replicated data structure
// through all the logs.
let res = replica.execute_mut_scan(OpWr(100), idx);
assert_eq!(Some(100), res);

This method is useful when a replica stops making progress and some threads on another replica are still active. The active replica will use all the entries in the log and won’t be able perform garbage collection because of the inactive replica. So, this method syncs up the replica against the underlying log.

This method is useful to when a replica stops consuming a particular log and some other replica is still using this a log.

No need to run in a loop because the replica will be synced for log_id if there is an active combiner.

Trait Implementations

Formats the value using the given formatter. Read more

The Replica is Sync. Member variables are protected by a CAS on combiner. Contexts are thread-safe.

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more

Immutably borrows from an owned value. Read more

Mutably borrows from an owned value. Read more

Performs the conversion.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.