use crate::{Event, Timestamp};
use anyhow::{anyhow, bail, Result};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Segment<'a, T>
where
T: Clone + PartialEq,
{
timestamp: Timestamp,
snapshot: Vec<Cow<'a, T>>,
events: Vec<Event<'a, T>>,
}
impl<'a, T> Segment<'a, T>
where
T: Clone + PartialEq,
{
pub fn new() -> Segment<'a, T> {
Self::from_projection(vec![], vec![])
}
pub fn from_projection(
projection: Vec<Cow<'a, T>>,
events: Vec<Event<'a, T>>,
) -> Segment<'a, T> {
Self {
timestamp: Utc::now(),
snapshot: projection,
events,
}
}
pub fn get_time(&self) -> &Timestamp {
&self.timestamp
}
pub fn get_projection(&self) -> &Vec<Cow<'a, T>> {
&self.snapshot
}
pub fn project_at_onto(
&self,
timestamp: &Timestamp,
snapshot: Vec<Cow<'a, T>>,
) -> Option<Vec<Cow<'a, T>>> {
if timestamp < &self.timestamp {
return None;
};
let mut projection = snapshot;
for event in &self.events {
if event.get_time() > timestamp {
break;
}
Self::apply_event_to(&mut projection, event.clone()).ok()?;
}
Some(projection)
}
pub fn push(&mut self, event: Event<'a, T>) -> Result<()> {
let new_event_time = event.get_time();
if new_event_time < &self.timestamp {
bail!("Cannot accept events before the segment started")
} else
if let Some(last_event_time) = self.events.last().map(|event| event.get_time()) {
if new_event_time < last_event_time {
bail!("Cannot accept events predating the lastest logged event")
}
}
self.push_unchecked(event)?;
Ok(())
}
fn push_unchecked(&mut self, event: Event<'a, T>) -> Result<()> {
self.apply_event(event.clone())?;
self.events.push(event);
Ok(())
}
fn apply_event(&mut self, event: Event<'a, T>) -> Result<()> {
Self::apply_event_to(&mut self.snapshot, event)
}
fn apply_event_to(snapshot: &mut Vec<Cow<'a, T>>, event: Event<'a, T>) -> Result<()> {
let prev_position = snapshot.iter_mut().position(|e| **e == *event);
match &event {
Event::Create(_) => {
if prev_position.is_some() {
bail!("Cannot create pre-existing data")
}
snapshot.push(event.take());
Ok(())
}
Event::Update(_) => {
if let Some(index) = prev_position {
*snapshot.get_mut(index).unwrap() = event.take();
Ok(())
} else {
bail!("Cannot modify non-existent data")
}
}
Event::Delete(_) => {
if let Some(index) = prev_position {
snapshot.remove(index);
Ok(())
} else {
bail!("Cannot delete non-existent data")
}
}
}
}
pub fn prepend(&mut self, other: Self) -> Result<()> {
self.events
.len()
.checked_add(other.events.len())
.ok_or(anyhow!("Cannot merge segments exceeding usize"))?;
if self.get_time()
< other
.events
.last()
.map(|e| e.get_time())
.unwrap_or(other.get_time())
{
bail!("Cannot prepend another segment if this one predates it")
}
self.prepend_unchecked(other);
Ok(())
}
pub fn prepend_unchecked(&mut self, mut other: Self) {
other.events.append(&mut self.events);
self.events = other.events;
self.timestamp = other.timestamp;
}
pub(super) fn get_events(&self) -> &Vec<Event<'a, T>> {
&self.events
}
}