use std::sync::Arc;
use async_trait::async_trait;
use futures::{TryFutureExt, future::join_all};
use itertools::Itertools;
use rspack_paths::{Utf8Path, Utf8PathBuf};
use super::{SplitPackStrategy, util::get_indexed_packs};
use crate::{
FileSystem,
error::{Error, ErrorType, Result},
pack::{
data::{Pack, PackFileMeta, PackKeys, PackScope, ScopeMeta},
strategy::{PackMainContents, PackReadStrategy, ScopeReadStrategy},
},
};
#[async_trait]
impl ScopeReadStrategy for SplitPackStrategy {
async fn ensure_meta(&self, scope: &mut PackScope) -> Result<()> {
if !scope.meta.loaded() {
let meta_path = ScopeMeta::get_path(&scope.path);
let meta = read_scope_meta(scope.name, &meta_path, self.fs.clone())
.await?
.unwrap_or_else(|| ScopeMeta::new(&scope.path, &scope.options));
scope.meta.set_value(meta);
}
Ok(())
}
async fn ensure_packs(&self, scope: &mut PackScope) -> Result<()> {
self.ensure_meta(scope).await?;
if !scope.packs.loaded() {
scope.packs.set_value(
scope
.meta
.expect_value()
.packs
.iter()
.enumerate()
.map(|(bucket_id, bucket_pack_metas)| {
let bucket_path = scope.path.join(bucket_id.to_string());
bucket_pack_metas
.iter()
.map(|pack_meta| Pack::new(bucket_path.join(&pack_meta.name)))
.collect_vec()
})
.collect_vec(),
);
}
Ok(())
}
async fn ensure_keys(&self, scope: &mut PackScope) -> Result<()> {
self.ensure_packs(scope).await?;
let read_key_results = read_keys(scope, self).await?;
let packs = scope.packs.expect_value_mut();
for result in read_key_results {
if let Some(pack) = packs
.get_mut(result.bucket_id)
.and_then(|packs| packs.get_mut(result.pack_pos))
{
pack.keys.set_value(result.keys);
}
}
Ok(())
}
async fn ensure_contents(&self, scope: &mut PackScope) -> Result<()> {
self.ensure_keys(scope).await?;
let read_content_results = read_contents(scope, self).await?;
let packs = scope.packs.expect_value_mut();
for result in read_content_results {
if let Some(pack) = packs
.get_mut(result.bucket_id)
.and_then(|packs| packs.get_mut(result.pack_pos))
{
pack.contents.set_value(result.contents.contents);
pack.generations = result.contents.generations;
}
}
Ok(())
}
fn get_path(&self, str: &str) -> Utf8PathBuf {
self.root.join(str)
}
}
async fn read_scope_meta(
scope: &'static str,
path: &Utf8Path,
fs: Arc<dyn FileSystem>,
) -> Result<Option<ScopeMeta>> {
if !fs.exists(path).await? {
return Ok(None);
}
let mut reader = fs.read_file(path).await?;
let option_items = reader
.read_line()
.await?
.split(" ")
.map(|item| {
item.parse::<usize>().map_err(|e| {
Error::from_reason(
Some(ErrorType::Load),
Some(scope),
format!("parse option meta failed: {e}"),
)
})
})
.collect::<Result<Vec<usize>>>()?;
if option_items.len() < 3 {
return Err(Error::from_reason(
Some(ErrorType::Load),
Some(scope),
"option meta not match".to_string(),
));
}
let bucket_size = option_items[0];
let pack_size = option_items[1];
let generation = option_items[2];
let mut packs = vec![];
for _ in 0..bucket_size {
packs.push(
reader
.read_line()
.await?
.split(" ")
.filter(|i| !i.is_empty())
.map(|i| i.split(",").collect::<Vec<_>>())
.map(|i| {
if i.len() < 3 {
Err(Error::from_reason(
Some(ErrorType::Load),
Some(scope),
"file meta not match".to_string(),
))
} else {
Ok(PackFileMeta {
name: i[0].to_owned(),
hash: i[1].to_owned(),
size: i[2].parse::<usize>().map_err(|e| {
Error::from_reason(
Some(ErrorType::Load),
Some(scope),
format!("parse file meta failed: {e}"),
)
})?,
generation: i[3].parse::<usize>().map_err(|e| {
Error::from_reason(
Some(ErrorType::Load),
Some(scope),
format!("parse file meta failed: {e}"),
)
})?,
wrote: true,
})
}
})
.collect::<Result<Vec<PackFileMeta>>>()?,
);
}
if packs.len() < bucket_size {
return Err(Error::from_reason(
Some(ErrorType::Load),
Some(scope),
"bucket size not match".to_string(),
));
}
Ok(Some(ScopeMeta {
path: path.to_path_buf(),
bucket_size,
pack_size,
generation,
packs,
}))
}
#[derive(Debug)]
struct ReadKeysResult {
pub bucket_id: usize,
pub pack_pos: usize,
pub keys: PackKeys,
}
fn read_keys_filter(pack: &Pack, _: &PackFileMeta) -> bool {
!pack.keys.loaded()
}
async fn read_keys(scope: &PackScope, strategy: &SplitPackStrategy) -> Result<Vec<ReadKeysResult>> {
let (pack_indexes, packs) = get_indexed_packs(scope, Some(&read_keys_filter));
let tasks = packs
.into_iter()
.map(|i| {
let strategy = strategy.clone();
let path = i.1.path.clone();
tokio::spawn(async move { strategy.read_pack_keys(&path).await })
.map_err(|e| Error::from_error(Some(ErrorType::Load), Some(scope.name), Box::new(e)))
})
.collect_vec();
let pack_keys = join_all(tasks).await.into_iter().process_results(|iter| {
iter
.into_iter()
.process_results(|iter| iter.map(|x| x.unwrap_or_default()).collect_vec())
})??;
Ok(
pack_keys
.into_iter()
.zip(pack_indexes.into_iter())
.map(|(keys, (bucket_id, pack_pos))| ReadKeysResult {
bucket_id,
pack_pos,
keys,
})
.collect_vec(),
)
}
#[derive(Debug)]
struct ReadContentsResult {
pub bucket_id: usize,
pub pack_pos: usize,
pub contents: PackMainContents,
}
fn read_contents_filter(pack: &Pack, _: &PackFileMeta) -> bool {
pack.keys.loaded() && (!pack.contents.loaded() || pack.contents.is_released())
}
async fn read_contents(
scope: &PackScope,
strategy: &SplitPackStrategy,
) -> Result<Vec<ReadContentsResult>> {
let (pack_indexes, packs) = get_indexed_packs(scope, Some(&read_contents_filter));
let tasks = packs
.into_iter()
.map(|i| {
let strategy = strategy.to_owned();
let path = i.1.path.to_owned();
tokio::spawn(async move { strategy.read_pack_contents(&path).await })
.map_err(|e| Error::from_error(Some(ErrorType::Load), Some(scope.name), Box::new(e)))
})
.collect_vec();
let pack_contents = join_all(tasks).await.into_iter().process_results(|iter| {
iter
.into_iter()
.process_results(|iter| iter.map(|x| x.unwrap_or_default()).collect_vec())
})??;
Ok(
pack_contents
.into_iter()
.zip(pack_indexes.into_iter())
.map(|(contents, (bucket_id, pack_pos))| ReadContentsResult {
bucket_id,
pack_pos,
contents,
})
.collect_vec(),
)
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, sync::Arc};
use itertools::Itertools;
use rspack_paths::Utf8Path;
use crate::{
FileSystem,
error::Result,
pack::{
data::{PackOptions, PackScope, ScopeMeta},
strategy::{
ScopeReadStrategy, SplitPackStrategy,
split::util::test_pack_utils::{
clean_strategy, create_strategies, mock_pack_file, mock_scope_meta_file,
},
},
},
};
async fn mock_scope(path: &Utf8Path, fs: &dyn FileSystem, options: &PackOptions) -> Result<()> {
mock_scope_meta_file(&ScopeMeta::get_path(path), fs, options, 3).await?;
for bucket_id in 0..options.bucket_size {
for pack_no in 0..3 {
let unique_id = format!("{bucket_id}_{pack_no}");
let pack_name = format!("pack_name_{bucket_id}_{pack_no}");
let pack_path = path.join(format!("./{bucket_id}/{pack_name}"));
mock_pack_file(&pack_path, &unique_id, 10, fs).await?;
}
}
Ok(())
}
async fn test_read_meta(scope: &mut PackScope, strategy: &SplitPackStrategy) -> Result<()> {
strategy.ensure_meta(scope).await?;
let meta = scope.meta.expect_value();
assert_eq!(meta.path, ScopeMeta::get_path(&scope.path));
assert_eq!(meta.bucket_size, scope.options.bucket_size);
assert_eq!(meta.pack_size, scope.options.pack_size);
assert_eq!(meta.packs.len(), scope.options.bucket_size);
assert_eq!(
meta
.packs
.iter()
.flatten()
.map(|i| (i.name.as_str(), i.hash.as_str(), i.size, i.wrote))
.collect_vec(),
vec![
("pack_name_0_0", "pack_hash_0_0", 100, true),
("pack_name_0_1", "pack_hash_0_1", 100, true),
("pack_name_0_2", "pack_hash_0_2", 100, true),
]
);
Ok(())
}
async fn test_read_packs(scope: &mut PackScope, strategy: &SplitPackStrategy) -> Result<()> {
strategy.ensure_keys(scope).await?;
let all_keys = scope
.packs
.expect_value()
.iter()
.flatten()
.flat_map(|pack| pack.keys.expect_value().to_owned())
.map(|x| x.as_ref().to_owned())
.collect::<HashSet<_>>();
assert!(
all_keys.contains(format!("key_{}_{}_{}", scope.options.bucket_size - 1, 2, 9).as_bytes())
);
strategy.ensure_contents(scope).await?;
let all_contents = scope
.packs
.expect_value()
.iter()
.flatten()
.flat_map(|pack| pack.contents.expect_value().to_owned())
.map(|x| x.as_ref().to_owned())
.collect::<HashSet<_>>();
assert!(
all_contents
.contains(format!("val_{}_{}_{}", scope.options.bucket_size - 1, 2, 9).as_bytes())
);
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn should_read_scope() -> Result<()> {
for strategy in create_strategies("read_scope") {
clean_strategy(&strategy).await;
let options = Arc::new(PackOptions {
bucket_size: 1,
pack_size: 16,
});
let mut scope = PackScope::new(
"scope_name",
strategy.get_path("scope_name"),
options.clone(),
);
mock_scope(&scope.path, strategy.fs.as_ref(), &scope.options)
.await
.expect("should mock packs");
test_read_meta(&mut scope, &strategy).await?;
test_read_packs(&mut scope, &strategy).await?;
}
Ok(())
}
}