use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use crate::errors::GroupResult;
use crate::manifest::expansion::ExpandedEntity;
const MAX_ROWS_PER_SHARD: u64 = 10_000_000_000;
const BYTES_PER_ROW_ESTIMATE: u64 = 800;
const FALLBACK_ROW_BUDGET: u64 = 1_000_000;
const BYTES_PER_MB: u64 = 1_000_000;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ShardAssignment {
pub shard_id: String,
pub scoping_profile: String,
pub entity_codes: Vec<String>,
pub estimated_rows: u64,
pub estimated_archive_size_mb: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ShardPlan {
pub shards: Vec<ShardAssignment>,
}
pub fn build_shard_plan(
entities: &[ExpandedEntity],
scoping_profiles: &BTreeMap<String, serde_yaml::Value>,
) -> GroupResult<ShardPlan> {
let mut profile_entities: BTreeMap<String, Vec<&ExpandedEntity>> = BTreeMap::new();
for entity in entities {
profile_entities
.entry(entity.scoping_profile.clone())
.or_default()
.push(entity);
}
for codes in profile_entities.values_mut() {
codes.sort_by(|a, b| a.code.cmp(&b.code));
}
let mut shards: Vec<ShardAssignment> = Vec::new();
for (profile, profile_ents) in &profile_entities {
if profile_ents.is_empty() {
continue;
}
let row_budget_per_entity = read_row_budget(scoping_profiles, profile);
let initial = profile_initial(profile);
let mut shard_index: u32 = 1;
let mut current_codes: Vec<String> = Vec::new();
let mut current_rows: u64 = 0;
for entity in profile_ents {
let entity_rows = entity.rows.unwrap_or(row_budget_per_entity);
if !current_codes.is_empty()
&& current_rows.saturating_add(entity_rows) > MAX_ROWS_PER_SHARD
{
shards.push(make_shard(
&initial,
shard_index,
profile,
std::mem::take(&mut current_codes),
current_rows,
));
shard_index += 1;
current_rows = 0;
}
current_codes.push(entity.code.clone());
current_rows = current_rows.saturating_add(entity_rows);
}
if !current_codes.is_empty() {
shards.push(make_shard(
&initial,
shard_index,
profile,
current_codes,
current_rows,
));
}
}
Ok(ShardPlan { shards })
}
fn profile_initial(profile: &str) -> String {
match profile {
"significant" => "SIG".to_string(),
"material" => "MAT".to_string(),
"consolidation_only" => "CON".to_string(),
other => {
let letters: String = other
.chars()
.filter(|c| c.is_ascii_alphabetic())
.take(3)
.collect::<String>()
.to_uppercase();
if letters.is_empty() {
"UNK".to_string()
} else {
format!("{letters:X<3}")
}
}
}
}
fn read_row_budget(scoping_profiles: &BTreeMap<String, serde_yaml::Value>, profile: &str) -> u64 {
scoping_profiles
.get(profile)
.and_then(|v| v.get("row_budget"))
.and_then(|v| v.as_u64())
.unwrap_or(FALLBACK_ROW_BUDGET)
}
impl ShardPlan {
pub fn shard_by_code(&self) -> BTreeMap<String, String> {
let mut m = BTreeMap::new();
for s in &self.shards {
for code in &s.entity_codes {
m.insert(code.clone(), s.shard_id.clone());
}
}
m
}
}
fn make_shard(
initial: &str,
index: u32,
profile: &str,
entity_codes: Vec<String>,
estimated_rows: u64,
) -> ShardAssignment {
let shard_id = format!("S_{initial}_{index:04}");
let estimated_archive_size_mb =
estimated_rows.saturating_mul(BYTES_PER_ROW_ESTIMATE) / BYTES_PER_MB;
ShardAssignment {
shard_id,
scoping_profile: profile.to_string(),
entity_codes,
estimated_rows,
estimated_archive_size_mb,
}
}