arcon_state 0.2.1

State management features for Arcon
Documentation
pub mod handles;
#[macro_use]
pub mod macros;
pub mod ops;
pub mod serialization;

#[cfg(test)]
#[macro_use]
pub mod test_common;

pub use crate::{
    handles::Handle,
    ops::{AggregatorOps, MapOps, ReducerOps, ValueOps, VecOps},
};

use crate::{
    data::{Key, Metakey, Value},
    error::*,
};
use std::{
    any,
    collections::{BTreeSet, HashMap},
    fmt,
    fmt::{Debug, Formatter},
    fs,
    marker::PhantomData,
    path::{Path, PathBuf},
};

#[derive(Debug, Default)]
pub struct Config {
    pub live_state_base_path: PathBuf,
    pub checkpoints_base_path: PathBuf,
    pub backend_ids: Vec<String>,
}

pub trait Backend:
    ValueOps + MapOps + VecOps + ReducerOps + AggregatorOps + Send + Sync + 'static
{
    fn restore_or_create(config: &Config, id: String) -> Result<Self>
    where
        Self: Sized,
    {
        // TODO: make IO errors have more context
        let mut state_path = config.live_state_base_path.clone();
        state_path.push(&id);
        if state_path.exists() {
            fs::remove_dir_all(&state_path)?;
        }

        let mut checkpoints: HashMap<&str, BTreeSet<u64>> = config
            .backend_ids
            .iter()
            .map(|id| (id.as_str(), BTreeSet::new()))
            .collect();

        for directory in fs::read_dir(&config.checkpoints_base_path)? {
            let directory = directory?;

            let invalid_path = || InvalidPath {
                path: directory.path(),
            };

            let dir_name = directory.file_name();
            let dir_name = dir_name.to_str().with_context(invalid_path)?;

            const CHECKPOINT_PREFIX: &str = "checkpoint_";

            ensure!(
                dir_name.starts_with(CHECKPOINT_PREFIX)
                    && directory.metadata().map(|m| m.is_dir()).unwrap_or(false),
                InvalidPath {
                    path: directory.path()
                }
            );

            let dir_name = &dir_name[CHECKPOINT_PREFIX.len()..];
            let mut dir_name_parts = dir_name.split('_');
            let id = dir_name_parts.next().with_context(invalid_path)?;
            let epoch: u64 = dir_name_parts
                .next()
                .with_context(invalid_path)?
                .parse()
                .ok()
                .with_context(invalid_path)?;

            ensure!(
                dir_name_parts.next().is_none(),
                InvalidPath {
                    path: directory.path(),
                }
            );

            let checkpoints_for_id = checkpoints.get_mut(id).with_context(|| UnknownNode {
                unknown_node: id.to_string(),
                known_nodes: config.backend_ids.clone(),
            })?;

            checkpoints_for_id.insert(epoch);
        }

        let mut checkpoints = checkpoints.into_iter();
        let mut complete_checkpoints = checkpoints.next().map(|x| x.1);
        // complete checkpoints are the ones that are in in every checkpoint set,
        // so we just intersect all the sets
        if let Some(complete) = &mut complete_checkpoints {
            for (_, other) in checkpoints {
                *complete = complete.intersection(&other).copied().collect();
            }
        }

        let last_complete_checkpoint =
            complete_checkpoints.and_then(|ce| ce.iter().last().copied());

        match last_complete_checkpoint {
            Some(epoch) => {
                let mut latest_checkpoint_path = config.checkpoints_base_path.clone();
                latest_checkpoint_path.push(format!(
                    "checkpoint_{id}_{epoch}",
                    id = id,
                    epoch = epoch
                ));

                Self::restore(&state_path, &latest_checkpoint_path, id)
            }
            None => Self::create(&state_path, id),
        }
    }

    fn name(&self) -> &str;

    fn create(live_path: &Path, name: String) -> Result<Self>
    where
        Self: Sized;
    fn restore(live_path: &Path, checkpoint_path: &Path, name: String) -> Result<Self>
    where
        Self: Sized;

    fn was_restored(&self) -> bool;

    fn checkpoint(&self, checkpoint_path: &Path) -> Result<()>;

    // region handle registration
    fn register_value_handle<T: Value, IK: Metakey, N: Metakey>(
        &self,
        handle: &mut Handle<ValueState<T>, IK, N>,
    );
    fn register_map_handle<K: Key, V: Value, IK: Metakey, N: Metakey>(
        &self,
        handle: &mut Handle<MapState<K, V>, IK, N>,
    );
    fn register_vec_handle<T: Value, IK: Metakey, N: Metakey>(
        &self,
        handle: &mut Handle<VecState<T>, IK, N>,
    );
    fn register_reducer_handle<T: Value, F: Reducer<T>, IK: Metakey, N: Metakey>(
        &self,
        handle: &mut Handle<ReducerState<T, F>, IK, N>,
    );
    fn register_aggregator_handle<A: Aggregator, IK: Metakey, N: Metakey>(
        &self,
        handle: &mut Handle<AggregatorState<A>, IK, N>,
    );
    // endregion
}

pub trait StateType: Default {
    type ExtraData;
}

#[derive(Debug)]
pub struct ValueState<T: Value>(PhantomData<T>);
impl<T: Value> StateType for ValueState<T> {
    type ExtraData = ();
}
impl<T: Value> Default for ValueState<T> {
    fn default() -> Self {
        ValueState(Default::default())
    }
}

#[derive(Debug)]
pub struct MapState<K: Key, V: Value>(PhantomData<(K, V)>);
impl<K: Key, V: Value> StateType for MapState<K, V> {
    type ExtraData = ();
}
impl<K: Key, V: Value> Default for MapState<K, V> {
    fn default() -> Self {
        MapState(Default::default())
    }
}

#[derive(Debug)]
pub struct VecState<T: Value>(PhantomData<T>);
impl<T: Value> StateType for VecState<T> {
    type ExtraData = ();
}
impl<T: Value> Default for VecState<T> {
    fn default() -> Self {
        VecState(Default::default())
    }
}

pub trait Reducer<T>: Fn(&T, &T) -> T + Send + Sync + Clone + 'static {}
impl<F, T> Reducer<T> for F where F: Fn(&T, &T) -> T + Send + Sync + Clone + 'static {}

#[derive(Debug)]
pub struct ReducerState<T: Value, F: Reducer<T>>(PhantomData<(T, F)>);
impl<T: Value, F: Reducer<T>> StateType for ReducerState<T, F> {
    type ExtraData = F;
}
impl<T: Value, F: Reducer<T>> Default for ReducerState<T, F> {
    fn default() -> Self {
        ReducerState(Default::default())
    }
}

pub trait Aggregator: Send + Sync + Clone + 'static {
    type Input: Value;
    type Accumulator: Value;
    type Result;

    fn create_accumulator(&self) -> Self::Accumulator;
    fn add(&self, acc: &mut Self::Accumulator, value: Self::Input);
    fn merge_accumulators(
        &self,
        fst: Self::Accumulator,
        snd: Self::Accumulator,
    ) -> Self::Accumulator;
    fn accumulator_into_result(&self, acc: Self::Accumulator) -> Self::Result;
}

#[derive(Debug)]
pub struct AggregatorState<A: Aggregator>(PhantomData<A>);
impl<A: Aggregator> StateType for AggregatorState<A> {
    type ExtraData = A;
}
impl<A: Aggregator> Default for AggregatorState<A> {
    fn default() -> Self {
        AggregatorState(Default::default())
    }
}

#[cfg(feature = "rocks")]
pub mod rocks;
#[cfg(feature = "rocks")]
pub use self::rocks::Rocks;

#[cfg(feature = "metrics")]
pub mod metrics_utils;
#[cfg(feature = "sled")]
pub mod sled;

#[cfg(feature = "sled")]
pub use self::sled::Sled;

#[derive(PartialEq, Eq, Copy, Clone, Debug)]
pub enum BackendType {
    #[cfg(feature = "rocks")]
    Rocks,
    #[cfg(feature = "sled")]
    Sled,
}

impl fmt::Display for BackendType {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl BackendType {
    pub const VARIANTS: &'static [BackendType] = {
        use BackendType::*;
        &[
            #[cfg(feature = "rocks")]
            Rocks,
            #[cfg(feature = "sled")]
            Sled,
        ]
    };

    pub const STR_VARIANTS: &'static [&'static str] = {
        &[
            #[cfg(feature = "rocks")]
            "Rocks",
            #[cfg(feature = "sled")]
            "Sled",
        ]
    };
}

use std::str::FromStr;
impl FromStr for BackendType {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        use BackendType::*;
        match s {
            #[cfg(feature = "rocks")]
            x if x.eq_ignore_ascii_case("Rocks") => Ok(Rocks),
            #[cfg(feature = "sled")]
            x if x.eq_ignore_ascii_case("Sled") => Ok(Sled),
            _ => Err(format!(
                "valid values: {}",
                BackendType::VARIANTS
                    .iter()
                    .map(ToString::to_string)
                    .collect::<Vec<_>>()
                    .join(", ")
            )),
        }
    }
}

impl Default for BackendType {
    fn default() -> Self {
        BackendType::Sled
    }
}