pub mod network;
#[cfg(feature = "persistent")]
pub mod persistent_store;
pub mod routes;
pub mod state_machine;
pub mod store;
use std::collections::BTreeMap;
use std::io::Cursor;
use std::sync::Arc;
use openraft::network::RaftNetworkFactory;
use serde::{Deserialize, Serialize};
use crate::connector_config::ClusterConnector;
use crate::worker::WorkerCapacity;
pub type NodeId = u64;
openraft::declare_raft_types!(
pub TypeConfig:
D = ClusterCommand,
R = ClusterResponse,
NodeId = NodeId,
Node = RaftNode,
);
pub type VarpulisRaft = openraft::Raft<TypeConfig>;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct RaftNode {
pub addr: String,
}
impl std::fmt::Display for RaftNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.addr)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ClusterCommand {
RegisterWorker {
id: String,
address: String,
api_key: String,
capacity: WorkerCapacity,
},
DeregisterWorker {
id: String,
},
WorkerStatusChanged {
id: String,
status: String,
},
WorkerPipelinesUpdated {
id: String,
assigned_pipelines: Vec<String>,
},
WorkerMetricsUpdated {
id: String,
events_processed: u64,
pipelines_running: usize,
#[serde(default)]
pipeline_metrics: Vec<crate::worker::PipelineMetrics>,
},
GroupDeployed {
name: String,
group: serde_json::Value,
},
GroupUpdated {
name: String,
group: serde_json::Value,
},
GroupRemoved {
name: String,
},
MigrationStarted {
task: serde_json::Value,
},
MigrationUpdated {
id: String,
status: String,
},
MigrationRemoved {
id: String,
},
ConnectorCreated {
name: String,
connector: ClusterConnector,
},
ConnectorUpdated {
name: String,
connector: ClusterConnector,
},
ConnectorRemoved {
name: String,
},
ScalingPolicySet {
policy: Option<serde_json::Value>,
},
ModelRegistered {
name: String,
entry: crate::model_registry::ModelRegistryEntry,
},
ModelRemoved {
name: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ClusterResponse {
Ok,
Error { message: String },
}
pub struct RaftBootstrapResult {
pub raft: Arc<VarpulisRaft>,
pub shared_state: store::SharedCoordinatorState,
}
impl std::fmt::Debug for RaftBootstrapResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RaftBootstrapResult")
.finish_non_exhaustive()
}
}
pub async fn bootstrap(
node_id: NodeId,
peer_addrs: &[String],
admin_key: Option<String>,
) -> Result<RaftBootstrapResult, Box<dyn std::error::Error + Send + Sync>> {
let (mem_store, shared_state) = store::MemStore::with_shared_state();
let (log_store, state_machine) = openraft::storage::Adaptor::new(mem_store);
bootstrap_with_storage(
node_id,
peer_addrs,
admin_key,
log_store,
state_machine,
shared_state,
)
.await
}
#[cfg(feature = "persistent")]
pub async fn bootstrap_persistent(
node_id: NodeId,
peer_addrs: &[String],
admin_key: Option<String>,
data_dir: &str,
) -> Result<RaftBootstrapResult, Box<dyn std::error::Error + Send + Sync>> {
let node_path = format!("{}/node-{}", data_dir, node_id);
let (rocks_store, shared_state) =
persistent_store::RocksStore::open_with_shared_state(&node_path)?;
let (log_store, state_machine) = openraft::storage::Adaptor::new(rocks_store);
bootstrap_with_storage(
node_id,
peer_addrs,
admin_key,
log_store,
state_machine,
shared_state,
)
.await
}
async fn bootstrap_with_storage<S>(
node_id: NodeId,
peer_addrs: &[String],
admin_key: Option<String>,
log_store: openraft::storage::Adaptor<TypeConfig, S>,
state_machine: openraft::storage::Adaptor<TypeConfig, S>,
shared_state: store::SharedCoordinatorState,
) -> Result<RaftBootstrapResult, Box<dyn std::error::Error + Send + Sync>>
where
S: openraft::RaftStorage<TypeConfig>,
{
let config = openraft::Config {
heartbeat_interval: 500,
election_timeout_min: 1500,
election_timeout_max: 3000,
..Default::default()
};
let config = Arc::new(config);
let network = network::NetworkFactory::new(admin_key);
let raft = openraft::Raft::new(node_id, config, network, log_store, state_machine)
.await
.map_err(|e| format!("Failed to create Raft instance: {e}"))?;
if node_id == 1 {
let mut members: BTreeMap<NodeId, RaftNode> = BTreeMap::new();
for (i, addr) in peer_addrs.iter().enumerate() {
let nid = (i + 1) as u64;
members.insert(nid, RaftNode { addr: addr.clone() });
}
if let Err(e) = raft.initialize(members).await {
tracing::info!("Raft initialize (node {node_id}): {e} (may be already initialized)");
}
} else {
tracing::info!("Raft node {node_id}: waiting for cluster bootstrap from node 1");
}
Ok(RaftBootstrapResult {
raft: Arc::new(raft),
shared_state,
})
}
pub async fn bootstrap_with_network<NF>(
node_id: NodeId,
config: Arc<openraft::Config>,
network: NF,
) -> Result<RaftBootstrapResult, Box<dyn std::error::Error + Send + Sync>>
where
NF: RaftNetworkFactory<TypeConfig>,
{
let (mem_store, shared_state) = store::MemStore::with_shared_state();
let (log_store, state_machine) = openraft::storage::Adaptor::new(mem_store);
let raft = openraft::Raft::new(node_id, config, network, log_store, state_machine)
.await
.map_err(|e| format!("Failed to create Raft instance: {e}"))?;
Ok(RaftBootstrapResult {
raft: Arc::new(raft),
shared_state,
})
}