use sha3::{Digest, Sha3_256};
use std::time::{SystemTime, UNIX_EPOCH};
use thiserror::Error;
use ulid::Ulid;
use crate::{cursor::Args, metadata::Metadata, Event, Executor, ReadAggregator};
pub fn hash_ids(ids: Vec<impl Into<String>>) -> String {
let mut hasher = Sha3_256::new();
for id in ids {
hasher.update(id.into());
}
hex::encode(hasher.finalize())
}
#[derive(Debug, Error)]
pub enum WriteError {
#[error("invalid original version")]
InvalidOriginalVersion,
#[error("trying to commit event without data")]
MissingData,
#[error("{0}")]
Unknown(#[from] anyhow::Error),
#[error("systemtime >> {0}")]
SystemTime(#[from] std::time::SystemTimeError),
}
pub trait Aggregator: Default {
fn aggregator_type() -> &'static str;
}
pub trait AggregatorEvent: Aggregator {
fn event_name() -> &'static str;
}
#[derive(Clone)]
pub struct AggregatorBuilder {
aggregator_id: String,
aggregator_type: String,
routing_key: Option<String>,
routing_key_locked: bool,
original_version: u16,
data: Vec<(&'static str, Vec<u8>)>,
metadata: Metadata,
}
impl AggregatorBuilder {
pub fn new(aggregator_id: impl Into<String>) -> AggregatorBuilder {
AggregatorBuilder {
aggregator_id: aggregator_id.into(),
aggregator_type: "".to_owned(),
routing_key: None,
routing_key_locked: false,
original_version: 0,
data: Vec::default(),
metadata: Default::default(),
}
}
pub fn ids(ids: Vec<impl Into<String>>) -> AggregatorBuilder {
Self::new(hash_ids(ids))
}
pub fn original_version(&mut self, v: u16) -> &mut Self {
self.original_version = v;
self
}
pub fn routing_key(&mut self, v: impl Into<String>) -> &mut Self {
self.routing_key_opt(Some(v.into()))
}
pub fn routing_key_opt(&mut self, v: Option<String>) -> &mut Self {
if !self.routing_key_locked {
self.routing_key = v;
self.routing_key_locked = true;
}
self
}
pub fn metadata<M>(&mut self, key: impl Into<String>, value: &M) -> &mut Self
where
M: bitcode::Encode,
{
self.metadata.insert_enc(key, value);
self
}
pub fn requested_by(&mut self, value: impl Into<String>) -> &mut Self {
self.metadata.set_requested_by(value);
self
}
pub fn requested_as(&mut self, value: impl Into<String>) -> &mut Self {
self.metadata.set_requested_as(value);
self
}
pub fn metadata_from(&mut self, value: impl Into<Metadata>) -> &mut Self {
self.metadata = value.into();
self
}
pub fn event<D>(&mut self, v: &D) -> &mut Self
where
D: AggregatorEvent + bitcode::Encode,
{
self.data.push((D::event_name(), bitcode::encode(v)));
self.aggregator_type = D::aggregator_type().to_owned();
self
}
pub async fn commit<E: Executor>(&self, executor: &E) -> Result<String, WriteError> {
let first_event = executor
.read(
Some(vec![ReadAggregator::id(
&self.aggregator_type,
&self.aggregator_id,
)]),
None,
Args::forward(1, None),
)
.await
.map_err(WriteError::Unknown)?;
let routing_key = match first_event.edges.first() {
Some(event) => event.node.routing_key.to_owned(),
_ => self.routing_key.to_owned(),
};
let mut events = vec![];
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
for (version, (name, data)) in (self.original_version + 1..).zip(&self.data) {
let event = Event {
id: Ulid::new(),
name: name.to_string(),
data: data.to_vec(),
metadata: self.metadata.clone(),
timestamp: now.as_secs(),
timestamp_subsec: now.subsec_millis(),
aggregator_id: self.aggregator_id.to_owned(),
aggregator_type: self.aggregator_type.to_owned(),
version,
routing_key: routing_key.to_owned(),
};
events.push(event);
}
if events.is_empty() {
return Err(WriteError::MissingData);
}
executor.write(events).await?;
Ok(self.aggregator_id.to_owned())
}
}
pub fn create() -> AggregatorBuilder {
AggregatorBuilder::new(Ulid::new())
}
pub fn aggregator(id: impl Into<String>) -> AggregatorBuilder {
AggregatorBuilder::new(id)
}
pub trait AggregatorExecutor<E: Executor> {
fn has_event<A: AggregatorEvent>(
&self,
id: impl Into<String>,
) -> impl std::future::Future<Output = anyhow::Result<bool>> + Send;
fn original_version<A: AggregatorEvent>(
&self,
id: impl Into<String>,
) -> impl std::future::Future<Output = anyhow::Result<Option<u16>>> + Send;
}
impl<E: Executor> AggregatorExecutor<E> for E {
fn has_event<A: AggregatorEvent>(
&self,
id: impl Into<String>,
) -> impl std::future::Future<Output = anyhow::Result<bool>> + Send {
let id = id.into();
Box::pin(async {
let result = self
.read(
Some(vec![ReadAggregator::new(
A::aggregator_type(),
id,
A::event_name(),
)]),
None,
Args::backward(1, None),
)
.await?;
Ok(!result.edges.is_empty())
})
}
fn original_version<A: AggregatorEvent>(
&self,
id: impl Into<String>,
) -> impl std::future::Future<Output = anyhow::Result<Option<u16>>> + Send {
let id = id.into();
Box::pin(async {
let result = self
.read(
Some(vec![ReadAggregator::id(A::aggregator_type(), id)]),
None,
Args::backward(1, None),
)
.await?;
Ok(result.edges.first().map(|e| e.node.version))
})
}
}