use crate::datafold_node::DataFoldNode;
use crate::ingestion::IngestionError;
use crate::lambda::config::{LambdaConfig, LambdaStorage};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
pub struct NodeManager {
config: LambdaConfig,
nodes: Arc<Mutex<HashMap<String, Arc<tokio::sync::Mutex<DataFoldNode>>>>>,
single_node: Option<Arc<tokio::sync::Mutex<DataFoldNode>>>,
}
impl NodeManager {
pub fn get_single_node(&self) -> Option<Arc<tokio::sync::Mutex<DataFoldNode>>> {
self.single_node.clone()
}
pub async fn new(config: LambdaConfig) -> Result<Self, IngestionError> {
let mut manager = Self {
config: config.clone(),
nodes: Arc::new(Mutex::new(HashMap::new())),
single_node: None,
};
match &config.storage {
LambdaStorage::Config(crate::storage::DatabaseConfig::DynamoDb(_)) => {
}
_ => {
let user_id = std::env::var("FOLDB_USER_ID").map_err(|_| {
IngestionError::configuration_error(
"FOLDB_USER_ID environment variable required for single-tenant mode",
)
})?;
let node = manager.create_node(&user_id).await?;
manager.single_node = Some(node);
}
}
Ok(manager)
}
pub async fn get_node(
&self,
user_id: &str,
) -> Result<Arc<tokio::sync::Mutex<DataFoldNode>>, IngestionError> {
if let Some(node) = &self.single_node {
return Ok(node.clone());
}
{
let nodes = self.nodes.lock().unwrap();
if let Some(node) = nodes.get(user_id) {
return Ok(node.clone());
}
}
let node = self.create_node(user_id).await?;
{
let mut nodes = self.nodes.lock().unwrap();
nodes.insert(user_id.to_string(), node.clone());
}
Ok(node)
}
async fn create_node(
&self,
user_id: &str,
) -> Result<Arc<tokio::sync::Mutex<DataFoldNode>>, IngestionError> {
use crate::datafold_node::config::{DatabaseConfig, NodeConfig};
use crate::fold_db_core::factory;
use crate::fold_db_core::FoldDB;
let (db, node_config) = match &self.config.storage {
LambdaStorage::Config(storage_config) => {
let mut node_config = match storage_config {
DatabaseConfig::Local { path } => NodeConfig::new(path.clone()),
#[cfg(feature = "aws-backend")]
DatabaseConfig::DynamoDb(dynamo_config) => {
let mut cfg = NodeConfig::default();
let mut d_cfg = dynamo_config.clone();
d_cfg.user_id = Some(user_id.to_string());
cfg.database = DatabaseConfig::DynamoDb(d_cfg);
cfg
}
};
if let Some(schema_url) = &self.config.schema_service_url {
node_config = node_config.with_schema_service_url(schema_url);
}
let db = factory::create_fold_db(&node_config.database)
.await
.map_err(|e| IngestionError::StorageError(e.to_string()))?;
(db, node_config)
}
LambdaStorage::DbOps(db_ops) => {
let db_path = "custom_backend".to_string();
let progress_store =
Arc::new(crate::progress::InMemoryProgressStore::new());
let fold_db =
FoldDB::new_with_components(Arc::clone(db_ops), &db_path, Some(progress_store))
.await
.map_err(|e| IngestionError::StorageError(e.to_string()))?;
let node_config = NodeConfig::new(std::path::PathBuf::from(db_path));
let node_config = if let Some(schema_url) = &self.config.schema_service_url {
node_config.with_schema_service_url(schema_url)
} else {
node_config
};
(Arc::new(tokio::sync::Mutex::new(fold_db)), node_config)
}
};
let node = DataFoldNode::new_with_db(node_config, db)
.await
.map_err(|e| IngestionError::InvalidInput(e.to_string()))?;
Ok(Arc::new(tokio::sync::Mutex::new(node)))
}
pub fn invalidate_node(&self, user_id: &str) {
if let Ok(mut nodes) = self.nodes.lock() {
nodes.remove(user_id);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lambda::config::{LambdaConfig, LambdaStorage};
use tempfile::tempdir;
#[tokio::test]
async fn test_node_manager_single_mode() {
std::env::set_var("FOLDB_USER_ID", "test_user");
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("db");
let config = LambdaConfig::new(
crate::storage::DatabaseConfig::Local { path: db_path },
crate::lambda::config::LambdaLogging::Stdout,
);
let manager = NodeManager::new(config)
.await
.expect("Failed to create manager");
let node1 = manager
.get_node("user1")
.await
.expect("Failed to get node1");
let node2 = manager
.get_node("user2")
.await
.expect("Failed to get node2");
let id1 = node1.lock().await.get_node_id().to_string();
let id2 = node2.lock().await.get_node_id().to_string();
assert_eq!(
id1, id2,
"In single mode, all users should get the same node"
);
}
}