Module openraft::docs::getting_started
source · Expand description
§Getting Started with Openraft
In this chapter, we will build a key-value store cluster using Openraft.
-
examples/raft-kv-memstore is a complete example application that includes the server, client, and a demo cluster. This example uses an in-memory store for data storage.
-
examples/raft-kv-rocksdb is another complete example application that includes the server, client, and a demo cluster. This example uses RocksDB for persistent storage.
You can use these examples as a starting point for building your own key-value store cluster with Openraft.
Raft is a distributed consensus protocol designed to manage a replicated log containing state machine commands from clients.
Raft includes two major parts:
- Replicating logs consistently among nodes,
- Consuming the logs, which is mainly defined in the state machine.
Implementing your own Raft-based application with Openraft is quite simple, and it involves:
- Defining client request and response,
- Implementing a storage for Raft to store its state,
- Implementing a network layer for Raft to transmit messages.
§1. Define client request and response
A request is some data that modifies the Raft state machine. A response is some data that the Raft state machine returns to the client.
Request and response can be any types that implement AppData
and AppDataResponse
, for example:
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Request {key: String}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Response(Result<Option<String>, ClientError>);
These two types are entirely application-specific and are mainly related to the
state machine implementation in RaftStateMachine
.
§2. Define types for the application
Openraft is a generic implementation of Raft. It requires the application to define
concrete types for its generic arguments. Most types are parameterized by
RaftTypeConfig
and will be used to create a Raft
instance:
pub struct TypeConfig {}
impl openraft::RaftTypeConfig for TypeConfig {
type D = Request;
type R = Response;
type NodeId = NodeId;
type Node = BasicNode;
type Entry = openraft::Entry<TypeConfig>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
}
pub struct Raft<C: RaftTypeConfig> {}
Openraft provides default implementations for Node
(EmptyNode
and BasicNode
) and log Entry
(Entry
).
You can use these implementations directly or define your own custom types.
A RaftTypeConfig
is also used by other components such as RaftLogStorage
, RaftStateMachine
,
RaftNetworkFactory
and RaftNetwork
.
§3. Implement RaftLogStorage
and RaftStateMachine
The trait RaftLogStorage
defines how log data is stored and consumed.
It could be a wrapper for a local key-value store like RocksDB.
The trait RaftStateMachine
defines how log is interpreted. Usually it is an in memory state machine with or without on-disk data backed.
There is a good example,
Mem KV Store
,
that demonstrates what should be done when a method is called. The storage methods are listed as the below.
Follow the link to method document to see the details.
Kind | RaftLogStorage method | Return value | Description |
---|---|---|---|
Read log: | get_log_reader() | impl RaftLogReader | get a read-only log reader |
↳ try_get_log_entries() | get a range of logs | ||
get_log_state() | LogState | get first/last log id | |
Write log: | append() | () | append logs |
Write log: | truncate() | () | delete logs [index, +oo) |
Write log: | purge() | () | purge logs (-oo, index] |
Vote: | save_vote() | () | save vote |
Vote: | read_vote() | Vote | read vote |
Kind | RaftStateMachine method | Return value | Description |
---|---|---|---|
SM: | applied_state() | LogId , Membership | get last applied log id, membership |
SM: | apply() | Vec of AppDataResponse | apply logs to state machine |
Snapshot: | begin_receiving_snapshot() | SnapshotData | begin to install snapshot |
Snapshot: | install_snapshot() | () | install snapshot |
Snapshot: | get_current_snapshot() | Snapshot | get current snapshot |
Snapshot: | get_snapshot_builder() | impl RaftSnapshotBuilder | get a snapshot builder |
↳ build_snapshot() | build a snapshot from state machine |
Most of the APIs are quite straightforward, except two indirect APIs:
-
Read logs:
RaftLogStorage
defines a methodget_log_reader()
to get log readerRaftLogReader
:ⓘtrait RaftLogStorage<C: RaftTypeConfig> { type LogReader: RaftLogReader<C>; async fn get_log_reader(&mut self) -> Self::LogReader; }
RaftLogReader
defines the APIs to read logs, and is an also super trait ofRaftLogStorage
:try_get_log_entries()
get log entries in a range;
ⓘtrait RaftLogReader<C: RaftTypeConfig> { async fn try_get_log_entries<RB: RangeBounds<u64>>(&mut self, range: RB) -> Result<Vec<C::Entry>, ...>; }
And
RaftLogStorage::get_log_state()
get latest log state from the storage; -
Build a snapshot from the local state machine needs to be done in two steps:
§Ensure the storage implementation is correct
There is a Test suite for RaftLogStorage and RaftStateMachine available in Openraft. If your implementation passes the tests, Openraft should work well with it. To test your implementation, you have two options:
- Run
Suite::test_all()
with anasync fn()
that creates a new pair ofRaftLogStorage
andRaftStateMachine
, as shown in theMemStore
test:
#[test]
pub fn test_mem_store() -> anyhow::Result<()> {
openraft::testing::Suite::test_all(MemStore::new_async)
}
- Alternatively, run
Suite::test_all()
with aStoreBuilder
implementation, as shown in theRocksStore
test.
By following either of these approaches, you can ensure that your custom storage implementation can work correctly in a distributed system.
§An implementation has to guarantee data durability.
The caller always assumes a completed writing is persistent. The raft correctness highly depends on a reliable store.
§4. Implement RaftNetwork
Raft nodes need to communicate with each other to achieve consensus about the logs.
The trait RaftNetwork
defines the data transmission requirements.
pub trait RaftNetwork<C: RaftTypeConfig>: Send + Sync + 'static {
async fn vote(&mut self, rpc: VoteRequest<C::NodeId>) -> Result<...>;
async fn append_entries(&mut self, rpc: AppendEntriesRequest<C>) -> Result<...>;
async fn snapshot(&mut self, vote: Vote<C::NodeId>, snapshot: Snapshot<C>) -> Result<...>;
}
An implementation of RaftNetwork
can be considered as a wrapper that invokes
the corresponding methods of a remote Raft
. It is responsible for sending
and receiving messages between Raft nodes.
Here is the list of methods that need to be implemented for the RaftNetwork
trait:
RaftNetwork method | forward request | to target |
---|---|---|
append_entries() | AppendEntriesRequest | remote node Raft::append_entries() |
full_snapshot() | Snapshot | remote node Raft::install_full_snapshot() |
vote() | VoteRequest | remote node Raft::vote() |
Mem KV Network
demonstrates how to forward messages to other Raft nodes using reqwest
as network transport layer.
To receive and handle these requests, there should be a server endpoint for each of these RPCs.
When the server receives a Raft RPC, it simply passes it to its raft
instance and replies with the returned result:
Mem KV Server.
For a real-world implementation, you may want to use Tonic gRPC to handle gRPC-based communication between Raft nodes. The databend-meta project provides an excellent real-world example of a Tonic gRPC-based Raft network implementation.
§Implement RaftNetworkFactory
RaftNetworkFactory
is a singleton responsible for creating RaftNetwork
instances for each replication target node.
pub trait RaftNetworkFactory<C: RaftTypeConfig>: Send + Sync + 'static {
type Network: RaftNetwork<C>;
async fn new_client(&mut self, target: C::NodeId, node: &C::Node) -> Self::Network;
}
This trait contains only one method:
RaftNetworkFactory::new_client()
builds a newRaftNetwork
instance for a target node, intended for sending RPCs to that node. The associated typeRaftNetworkFactory::Network
represents the application’s implementation of theRaftNetwork
trait.
This function should not establish a connection; instead, it should create a client that connects when necessary.
§Find the address of the target node.
In Openraft, an implementation of RaftNetwork
needs to connect to remote Raft peers. To store additional information about each peer, you need to specify the Node
type in RaftTypeConfig
:
pub struct TypeConfig {}
impl openraft::RaftTypeConfig for TypeConfig {
// ...
type Node = BasicNode;
}
Then use Raft::add_learner(node_id, BasicNode::new("127.0.0.1"), ...)
to instruct Openraft to store node information in Membership
. This information is then consistently replicated across all nodes, and will be passed to RaftNetworkFactory::new_client()
to connect to remote Raft peers:
{
"configs": [ [ 1, 2, 3 ] ],
"nodes": {
"1": { "addr": "127.0.0.1:21001" },
"2": { "addr": "127.0.0.1:21002" },
"3": { "addr": "127.0.0.1:21003" }
}
}
§Caution: ensure that a connection to the right node
See: Ensure connection to the correct node
§5. Put everything together
Finally, we put these parts together and boot up a raft node main.rs :
// Define the types used in the application.
pub struct TypeConfig {}
impl openraft::RaftTypeConfig for TypeConfig {
type D = Request;
type R = Response;
type NodeId = NodeId;
type Node = BasicNode;
type Entry = openraft::Entry<TypeConfig>;
type SnapshotData = Cursor<Vec<u8>>;
}
#[tokio::main]
async fn main() {
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Setup the logger
env_logger::init_from_env(Env::default().default_filter_or("info"));
// Parse the parameters passed by arguments.
let options = Opt::parse();
let node_id = options.id;
// Create a configuration for the raft instance.
let config = Arc::new(Config::default().validate().unwrap());
// Create a instance of where the Raft data will be stored.
let store = Arc::new(ExampleStore::default());
let (log_store, state_machine) = Adaptor::new(store.clone());
// Create the network layer that will connect and communicate the raft instances and
// will be used in conjunction with the store created above.
let network = Arc::new(ExampleNetwork {});
// Create a local raft instance.
let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine).await.unwrap();
// Create an application that will store all the instances created above, this will
// be later used on the actix-web services.
let app = Data::new(ExampleApp {
id: options.id,
raft,
store,
config,
});
// Start the actix-web server.
HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.wrap(Logger::new("%a %{User-Agent}i"))
.wrap(middleware::Compress::default())
.app_data(app.clone())
// raft internal RPC
.service(raft::append).service(raft::snapshot).service(raft::vote)
// admin API
.service(management::init)
.service(management::add_learner)
.service(management::change_membership)
.service(management::metrics)
.service(management::list_nodes)
// application API
.service(api::write).service(api::read)
})
.bind(options.http_addr)?
.run()
.await
}
}
§6. Run the cluster
To set up a demo Raft cluster, follow these steps:
- Bring up three uninitialized Raft nodes.
- Initialize a single-node cluster.
- Add more Raft nodes to the cluster.
- Update the membership configuration.
The examples/raft-kv-memstore directory provides a detailed description of these steps.
Additionally, two test scripts for setting up a cluster are available:
-
test-cluster.sh is a minimal Bash script that uses
curl
to communicate with the Raft cluster. It demonstrates the plain HTTP messages being sent and received. -
test_cluster.rs uses the
ExampleClient
to set up a cluster, write data, and read it back.