use crate::hierarchy::deltas::{CellDelta, CoalitionDelta, CohortDelta, FederationDelta};
use crate::hierarchy::storage_trait::{DocumentMetrics, SummaryStorage};
use crate::Result;
use peat_schema::hierarchy::v1::{CellSummary, CoalitionSummary, CohortSummary, FederationSummary};
use std::sync::Arc;
use tracing::instrument;
pub struct HierarchicalAggregator {
storage: Arc<dyn SummaryStorage>,
}
impl HierarchicalAggregator {
pub fn new(storage: Arc<dyn SummaryStorage>) -> Self {
Self { storage }
}
#[instrument(skip(self, initial_state), fields(cell_id))]
pub async fn create_cell_summary(
&self,
cell_id: &str,
initial_state: &CellSummary,
) -> Result<String> {
self.storage
.create_cell_summary(cell_id, initial_state)
.await
}
#[instrument(skip(self, delta), fields(cell_id))]
pub async fn update_cell_summary(&self, cell_id: &str, delta: CellDelta) -> Result<()> {
self.storage.update_cell_summary(cell_id, delta).await
}
#[instrument(skip(self), fields(cell_id))]
pub async fn get_cell_summary(&self, cell_id: &str) -> Result<Option<CellSummary>> {
self.storage.get_cell_summary(cell_id).await
}
#[instrument(skip(self, initial_state), fields(cohort_id))]
pub async fn create_cohort_summary(
&self,
cohort_id: &str,
initial_state: &CohortSummary,
) -> Result<String> {
self.storage
.create_cohort_summary(cohort_id, initial_state)
.await
}
#[instrument(skip(self, delta), fields(cohort_id))]
pub async fn update_cohort_summary(&self, cohort_id: &str, delta: CohortDelta) -> Result<()> {
self.storage.update_cohort_summary(cohort_id, delta).await
}
#[instrument(skip(self), fields(cohort_id))]
pub async fn get_cohort_summary(&self, cohort_id: &str) -> Result<Option<CohortSummary>> {
self.storage.get_cohort_summary(cohort_id).await
}
#[instrument(skip(self, initial_state), fields(federation_id))]
pub async fn create_federation_summary(
&self,
federation_id: &str,
initial_state: &FederationSummary,
) -> Result<String> {
self.storage
.create_federation_summary(federation_id, initial_state)
.await
}
#[instrument(skip(self, delta), fields(federation_id))]
pub async fn update_federation_summary(
&self,
federation_id: &str,
delta: FederationDelta,
) -> Result<()> {
self.storage
.update_federation_summary(federation_id, delta)
.await
}
#[instrument(skip(self), fields(federation_id))]
pub async fn get_federation_summary(
&self,
federation_id: &str,
) -> Result<Option<FederationSummary>> {
self.storage.get_federation_summary(federation_id).await
}
#[instrument(skip(self, initial_state), fields(coalition_id))]
pub async fn create_coalition_summary(
&self,
coalition_id: &str,
initial_state: &CoalitionSummary,
) -> Result<String> {
self.storage
.create_coalition_summary(coalition_id, initial_state)
.await
}
#[instrument(skip(self, delta), fields(coalition_id))]
pub async fn update_coalition_summary(
&self,
coalition_id: &str,
delta: CoalitionDelta,
) -> Result<()> {
self.storage
.update_coalition_summary(coalition_id, delta)
.await
}
#[instrument(skip(self), fields(coalition_id))]
pub async fn get_coalition_summary(
&self,
coalition_id: &str,
) -> Result<Option<CoalitionSummary>> {
self.storage.get_coalition_summary(coalition_id).await
}
#[instrument(skip(self), fields(doc_id))]
pub async fn get_document_metrics(&self, doc_id: &str) -> Result<DocumentMetrics> {
self.storage.get_document_metrics(doc_id).await
}
#[instrument(skip(self), fields(doc_id))]
pub async fn validate_document(&self, doc_id: &str) -> Result<()> {
let metrics = self.get_document_metrics(doc_id).await?;
metrics.validate()
}
#[deprecated(note = "Use create_cell_summary() once, then update_cell_summary() for updates")]
#[instrument(skip(self, summary), fields(cell_id))]
pub async fn upsert_cell_summary(
&self,
cell_id: &str,
summary: &CellSummary,
) -> Result<String> {
match self.get_cell_summary(cell_id).await? {
Some(_existing) => {
Ok(format!("{}-summary", cell_id))
}
None => {
self.create_cell_summary(cell_id, summary).await
}
}
}
#[deprecated(
note = "Use create_cohort_summary() once, then update_cohort_summary() for updates"
)]
#[instrument(skip(self, summary), fields(cohort_id))]
pub async fn upsert_cohort_summary(
&self,
cohort_id: &str,
summary: &CohortSummary,
) -> Result<String> {
match self.get_cohort_summary(cohort_id).await? {
Some(_existing) => Ok(format!("{}-summary", cohort_id)),
None => self.create_cohort_summary(cohort_id, summary).await,
}
}
#[doc(hidden)]
pub fn storage(&self) -> &Arc<dyn SummaryStorage> {
&self.storage
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_coordinator_creation() {
}
}