use std::path::PathBuf;
use std::sync::Arc;
use serde::Deserialize;
use super::{
backend::StateBackend, in_process::InProcessBackend, object_store::ObjectStoreBackend,
};
pub const DEFAULT_VNODE_CAPACITY: u32 = 256;
fn default_vnode_capacity() -> u32 {
DEFAULT_VNODE_CAPACITY
}
fn default_instance_id() -> String {
"local".to_string()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DiscoveryMode {
#[default]
Static,
Dynamic,
}
#[derive(Clone, PartialEq, Eq, Default, Deserialize)]
#[serde(transparent)]
pub struct StorageOptions(pub rustc_hash::FxHashMap<String, String>);
impl std::fmt::Debug for StorageOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_map()
.entries(self.0.keys().map(|k| (k, "[REDACTED]")))
.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(tag = "backend", rename_all = "snake_case")]
pub enum StateBackendConfig {
InProcess {
#[serde(default = "default_vnode_capacity")]
vnode_capacity: u32,
},
Local {
path: PathBuf,
#[serde(default = "default_instance_id")]
instance_id: String,
#[serde(default = "default_vnode_capacity")]
vnode_capacity: u32,
},
ObjectStore {
url: String,
#[serde(default)]
storage: StorageOptions,
instance_id: String,
#[serde(default = "default_vnode_capacity")]
vnode_capacity: u32,
#[serde(default)]
vnodes: Option<Vec<u32>>,
#[serde(default)]
merger_instance: Option<String>,
#[serde(default)]
discovery: DiscoveryMode,
#[serde(default)]
seed_peers: Vec<String>,
},
}
impl Default for StateBackendConfig {
fn default() -> Self {
Self::InProcess {
vnode_capacity: DEFAULT_VNODE_CAPACITY,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum StateBackendBuildError {
#[error("state backend object store: {0}")]
Store(#[from] crate::checkpoint::object_store_builder::ObjectStoreBuilderError),
#[error("state backend construction failed: {0}")]
Io(String),
}
impl StateBackendConfig {
#[must_use]
pub fn in_process() -> Self {
Self::InProcess {
vnode_capacity: DEFAULT_VNODE_CAPACITY,
}
}
#[must_use]
pub fn local(path: impl Into<PathBuf>) -> Self {
Self::Local {
path: path.into(),
instance_id: default_instance_id(),
vnode_capacity: DEFAULT_VNODE_CAPACITY,
}
}
#[must_use]
pub fn object_store(url: impl Into<String>, instance_id: impl Into<String>) -> Self {
Self::ObjectStore {
url: url.into(),
storage: StorageOptions::default(),
instance_id: instance_id.into(),
vnode_capacity: DEFAULT_VNODE_CAPACITY,
vnodes: None,
merger_instance: None,
discovery: DiscoveryMode::Static,
seed_peers: Vec::new(),
}
}
#[allow(clippy::unused_async)]
pub async fn build(&self) -> Result<Arc<dyn StateBackend>, StateBackendBuildError> {
match self {
Self::InProcess { vnode_capacity } => {
Ok(Arc::new(InProcessBackend::new(*vnode_capacity)))
}
Self::Local {
path,
instance_id,
vnode_capacity,
} => {
std::fs::create_dir_all(path)
.map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
.map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
Ok(Arc::new(ObjectStoreBackend::new(
Arc::new(fs),
instance_id,
*vnode_capacity,
)))
}
Self::ObjectStore {
url,
storage,
instance_id,
vnode_capacity,
..
} => {
let store = cloud_store(url, storage)?;
Ok(Arc::new(ObjectStoreBackend::new(
store,
instance_id,
*vnode_capacity,
)))
}
}
}
#[must_use]
pub fn local_storage_dir(&self) -> Option<&std::path::Path> {
match self {
Self::Local { path, .. } => Some(path.as_path()),
_ => None,
}
}
pub fn build_object_store(
&self,
) -> Result<Option<Arc<dyn ::object_store::ObjectStore>>, StateBackendBuildError> {
match self {
Self::InProcess { .. } => Ok(None),
Self::Local { path, .. } => {
std::fs::create_dir_all(path)
.map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
.map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
Ok(Some(Arc::new(fs)))
}
Self::ObjectStore { url, storage, .. } => Ok(Some(cloud_store(url, storage)?)),
}
}
#[must_use]
pub fn is_durable(&self) -> bool {
!matches!(self, Self::InProcess { .. })
}
#[must_use]
pub fn vnode_capacity(&self) -> u32 {
match self {
Self::InProcess { vnode_capacity }
| Self::Local { vnode_capacity, .. }
| Self::ObjectStore { vnode_capacity, .. } => *vnode_capacity,
}
}
}
fn cloud_store(
url: &str,
storage: &StorageOptions,
) -> Result<Arc<dyn ::object_store::ObjectStore>, StateBackendBuildError> {
Ok(crate::checkpoint::object_store_builder::build_object_store(
url,
&storage
.0
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
)?)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_in_process_minimal() {
let toml = r#"backend = "in_process""#;
let c: StateBackendConfig = toml::from_str(toml).unwrap();
assert!(matches!(
c,
StateBackendConfig::InProcess {
vnode_capacity: 256
}
));
assert!(!c.is_durable());
assert!(c.local_storage_dir().is_none());
}
#[test]
fn parse_local_with_path() {
let toml = r#"
backend = "local"
path = "/var/laminar"
vnode_capacity = 128
"#;
let c: StateBackendConfig = toml::from_str(toml).unwrap();
assert_eq!(
c.local_storage_dir(),
Some(std::path::Path::new("/var/laminar"))
);
assert!(c.is_durable());
if let StateBackendConfig::Local { vnode_capacity, .. } = c {
assert_eq!(vnode_capacity, 128);
} else {
panic!("expected Local");
}
}
#[test]
fn parse_object_store_static() {
let toml = r#"
backend = "object_store"
url = "s3://bucket/laminar"
instance_id = "node-0"
vnodes = [0, 1, 2, 3]
merger_instance = "node-0"
"#;
let c: StateBackendConfig = toml::from_str(toml).unwrap();
match c {
StateBackendConfig::ObjectStore {
url,
instance_id,
vnodes,
merger_instance,
discovery,
..
} => {
assert_eq!(url, "s3://bucket/laminar");
assert_eq!(instance_id, "node-0");
assert_eq!(vnodes, Some(vec![0, 1, 2, 3]));
assert_eq!(merger_instance.as_deref(), Some("node-0"));
assert_eq!(discovery, DiscoveryMode::Static);
}
_ => panic!("expected ObjectStore"),
}
}
#[test]
fn parse_object_store_dynamic() {
let toml = r#"
backend = "object_store"
url = "s3://bucket/laminar"
instance_id = "node-0"
discovery = "dynamic"
seed_peers = ["10.0.0.1:7946", "10.0.0.2:7946"]
"#;
let c: StateBackendConfig = toml::from_str(toml).unwrap();
match c {
StateBackendConfig::ObjectStore {
discovery,
seed_peers,
..
} => {
assert_eq!(discovery, DiscoveryMode::Dynamic);
assert_eq!(seed_peers.len(), 2);
}
_ => panic!("expected ObjectStore dynamic"),
}
}
#[tokio::test]
async fn build_in_process_returns_backend() {
use bytes::Bytes;
let c = StateBackendConfig::in_process();
let backend = c.build().await.unwrap();
backend
.write_partial(0, 1, 0, Bytes::from_static(b"ok"))
.await
.unwrap();
assert_eq!(
&backend.read_partial(0, 1).await.unwrap().unwrap()[..],
b"ok",
);
}
#[tokio::test]
async fn build_local_instantiates_backend() {
let dir = tempfile::tempdir().unwrap();
let c = StateBackendConfig::local(dir.path());
let backend = c.build().await.unwrap();
backend
.write_partial(0, 1, 0, bytes::Bytes::from_static(b"z"))
.await
.unwrap();
assert_eq!(
&backend.read_partial(0, 1).await.unwrap().unwrap()[..],
b"z",
);
}
#[tokio::test]
async fn build_object_store_file_url_instantiates_backend() {
let dir = tempfile::tempdir().unwrap();
let url = format!(
"file://{}",
dir.path().display().to_string().replace('\\', "/")
);
let c = StateBackendConfig::object_store(url, "node-0");
let backend = c.build().await.unwrap();
backend
.write_partial(0, 1, 0, bytes::Bytes::from_static(b"z"))
.await
.unwrap();
let got = backend.read_partial(0, 1).await.unwrap().unwrap();
assert_eq!(&got[..], b"z");
}
#[cfg(not(feature = "aws"))]
#[tokio::test]
async fn build_object_store_s3_requires_aws_feature() {
use crate::checkpoint::object_store_builder::ObjectStoreBuilderError;
let c = StateBackendConfig::object_store("s3://bucket/path", "node-0");
let err = match c.build().await {
Ok(_) => panic!("s3 must not build without the aws feature"),
Err(e) => e,
};
assert!(
matches!(
err,
StateBackendBuildError::Store(ObjectStoreBuilderError::MissingFeature { .. })
),
"got: {err}",
);
}
#[cfg(feature = "aws")]
#[tokio::test]
async fn build_object_store_s3_builds_with_storage_options() {
let toml = r#"
backend = "object_store"
url = "s3://bucket/laminar"
instance_id = "node-0"
[storage]
endpoint = "http://127.0.0.1:9000"
aws_access_key_id = "k"
aws_secret_access_key = "s"
region = "us-east-1"
allow_http = "true"
"#;
let c: StateBackendConfig = toml::from_str(toml).unwrap();
c.build().await.expect("s3 client must build offline");
}
#[test]
fn default_is_in_process() {
let c = StateBackendConfig::default();
assert!(matches!(c, StateBackendConfig::InProcess { .. }));
}
#[test]
fn partial_eq_works() {
assert_eq!(
StateBackendConfig::in_process(),
StateBackendConfig::in_process()
);
assert_ne!(
StateBackendConfig::in_process(),
StateBackendConfig::local("/tmp/x")
);
}
}