use std::sync::Arc;
use anyhow::Result;
use uni_common::config::UniConfig;
use uni_common::core::fork::ForkId;
use uni_common::core::schema::SchemaManager;
use crate::runtime::writer::Writer;
use crate::storage::manager::StorageManager;
use super::{id_alloc, wal as fork_wal};
pub async fn new_for_fork(
storage: Arc<StorageManager>,
schema_manager: Arc<SchemaManager>,
fork_id: &ForkId,
config: UniConfig,
) -> Result<Writer> {
let store = storage.store();
let allocator =
id_alloc::new_for_fork_arc(store.clone(), fork_id, id_alloc::DEFAULT_FORK_BATCH_SIZE)
.await?;
let wal =
Arc::new(fork_wal::new_for_fork(store, fork_id).with_local_root(storage.local_fs_root()));
wal.initialize().await?;
let mut writer = Writer::new_with_config(
storage,
schema_manager,
0,
config,
Some(wal),
Some(allocator),
)
.await?;
writer.fork_id = Some(*fork_id);
Ok(writer)
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::ObjectStore;
use object_store::local::LocalFileSystem;
use object_store::path::Path as ObjectStorePath;
use tempfile::TempDir;
async fn primary_storage() -> (TempDir, Arc<StorageManager>, Arc<SchemaManager>) {
let dir = TempDir::new().unwrap();
let schema_path = dir.path().join("schema.json");
let schema_store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
let schema =
SchemaManager::load_from_store(schema_store, &ObjectStorePath::from("schema.json"))
.await
.unwrap();
let _ = schema_path; let schema = Arc::new(schema);
let storage_path = dir.path().join("storage");
std::fs::create_dir_all(&storage_path).unwrap();
let storage = StorageManager::new_with_config(
storage_path.to_str().unwrap(),
schema.clone(),
UniConfig::default(),
)
.await
.unwrap();
(dir, Arc::new(storage), schema)
}
#[tokio::test]
async fn new_for_fork_builds_writer_with_fork_allocator() {
let (_dir, storage, schema) = primary_storage().await;
let fork_id = ForkId::new();
let writer = new_for_fork(
storage.clone(),
schema.clone(),
&fork_id,
UniConfig::default(),
)
.await
.unwrap();
let v = writer.allocator.allocate_vid().await.unwrap();
assert_eq!(u64::from(v), 0);
}
#[tokio::test]
async fn two_fork_writers_have_independent_allocators() {
let (_dir, storage, schema) = primary_storage().await;
let id_a = ForkId::new();
let id_b = ForkId::new();
let writer_a = new_for_fork(storage.clone(), schema.clone(), &id_a, UniConfig::default())
.await
.unwrap();
let writer_b = new_for_fork(storage.clone(), schema.clone(), &id_b, UniConfig::default())
.await
.unwrap();
assert_eq!(
u64::from(writer_a.allocator.allocate_vid().await.unwrap()),
0
);
assert_eq!(
u64::from(writer_b.allocator.allocate_vid().await.unwrap()),
0
);
}
}