Skip to main content

openstack_keystone_distributed_storage/
app.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12//
13// SPDX-License-Identifier: Apache-2.0
14use std::path::Path;
15use std::sync::Arc;
16
17use openraft::Config;
18use tonic::transport::{Server, server::Router};
19use tracing::info;
20
21use crate::grpc::cluster_admin_service::ClusterAdminServiceImpl;
22use crate::grpc::identity_service::IdentityServiceImpl;
23use crate::grpc::raft_service::RaftServiceImpl;
24use crate::network::Network;
25use crate::pb::api::identity_service_server::IdentityServiceServer;
26use crate::pb::raft::cluster_admin_service_server::ClusterAdminServiceServer;
27use crate::pb::raft::raft_service_server::RaftServiceServer;
28use crate::types::*;
29
30/// Start storage node.
31pub async fn start_raft_app<P: AsRef<Path>>(
32    node_id: NodeId,
33    http_addr: String,
34    db_path: P,
35) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
36    let server_future = get_app_server(node_id, db_path)
37        .await?
38        .serve(http_addr.parse()?);
39
40    info!("Node {node_id} starting server at {http_addr}");
41    server_future.await?;
42
43    Ok(())
44}
45
46/// Build a tonic `Server` instance for the raft instance.
47pub async fn get_app_server<P: AsRef<Path>>(
48    node_id: NodeId,
49    db_path: P,
50) -> Result<Router, StoreError> {
51    // Create a configuration for the raft instance.
52    let config = Arc::new(
53        Config {
54            heartbeat_interval: 500,
55            election_timeout_min: 1500,
56            election_timeout_max: 3000,
57            ..Default::default()
58        }
59        .validate()?,
60    );
61
62    // Create stores and network
63    let (log_store, sm) = crate::new::<crate::TypeConfig, _>(db_path).await?;
64    let state_machine_store = Arc::new(sm);
65    let network = Network {};
66
67    // Create Raft instance
68    let raft = Raft::new(
69        node_id,
70        config.clone(),
71        network,
72        log_store,
73        state_machine_store.clone(),
74    )
75    .await?;
76
77    //// Create the management service with raft instance
78    let internal_service = RaftServiceImpl::new(raft.clone());
79    let cluster_admin_service = ClusterAdminServiceImpl::new(raft.clone());
80    let identity_service = IdentityServiceImpl::new(raft.clone(), state_machine_store.clone());
81
82    //// The app service uses the default limit since it's user-facing.
83    let raft_service = RaftServiceServer::new(internal_service);
84    let identity_service = IdentityServiceServer::new(identity_service);
85    let cluster_admin_service = ClusterAdminServiceServer::new(cluster_admin_service);
86
87    Ok(Server::builder()
88        .add_service(raft_service)
89        .add_service(cluster_admin_service)
90        .add_service(identity_service))
91}