holochain_types 0.1.0

Holochain common types
Documentation
//! # Database Cache
//! This is an in-memory cache that is used to store the state of the DHT database.

use crate::dht_op::DhtOpType;
use crate::share::RwShare;
use error::*;
use holo_hash::*;
use holochain_sqlite::prelude::*;
use rusqlite::named_params;
use std::collections::HashMap;
use std::ops::RangeInclusive;
use std::sync::Arc;

#[cfg(test)]
mod tests;

#[allow(missing_docs)]
pub mod error;

#[derive(Clone)]
/// This cache allows us to track selected database queries that
/// are too slow to run frequently.
/// The queries are lazily evaluated and cached.
/// Then the cache is updated in memory without needing to
/// go to the database.
pub struct DhtDbQueryCache {
    /// The database this is caching queries for.
    dht_db: DbRead<DbKindDht>,
    /// The cache of agent activity queries.
    activity: Arc<tokio::sync::OnceCell<ActivityCache>>,
}

type ActivityCache = RwShare<HashMap<Arc<AgentPubKey>, ActivityState>>;

#[derive(Default, Debug, Clone, PartialEq, Eq)]
/// The state of an authors activity according to this authority.
pub struct ActivityState {
    /// The bounds of integrated and ready to integrate activity.
    pub bounds: ActivityBounds,
    /// Any activity that is ready to be integrated but is waiting
    /// for one or more upstream chain items to be marked ready before it can
    /// be integrated.
    /// This is an ordered sparse set.
    pub awaiting_deps: Vec<u32>,
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
/// The state of an agent's activity.
pub struct ActivityBounds {
    /// The highest agent activity action sequence that is already integrated.
    pub integrated: Option<u32>,
    /// The highest consecutive action sequence number that is ready to integrate.
    pub ready_to_integrate: Option<u32>,
}

impl std::ops::Deref for ActivityState {
    type Target = ActivityBounds;

    fn deref(&self) -> &Self::Target {
        &self.bounds
    }
}

#[cfg(any(test, feature = "test_utils"))]
impl DhtDbQueryCache {
    /// Get the caches internal state for testing.
    pub async fn get_state(&self) -> &ActivityCache {
        self.get_or_try_init().await.unwrap()
    }
}

impl DhtDbQueryCache {
    /// Create a new cache for dht database queries.
    pub fn new(dht_db: DbRead<DbKindDht>) -> Self {
        Self {
            dht_db,
            activity: Default::default(),
        }
    }

    /// Lazily initiate the activity cache.
    async fn get_or_try_init(&self) -> DatabaseResult<&ActivityCache> {
        self.activity
            .get_or_try_init(|| {
                let db = self.dht_db.clone();
                async move {
                    let (activity_integrated, mut all_activity) = db
                        .async_reader(|txn| {
                            // Get the highest integrated sequence number for each agent.
                            let activity_integrated: Vec<(AgentPubKey, u32)> = txn
                            .prepare_cached(
                                holochain_sqlite::sql::sql_cell::ACTIVITY_INTEGRATED_UPPER_BOUND,
                            )?
                            .query_map(
                                named_params! {
                                    ":register_activity": DhtOpType::RegisterAgentActivity,
                                },
                                |row| {
                                    Ok((
                                        row.get::<_, Option<AgentPubKey>>(0)?,
                                        row.get::<_, Option<u32>>(1)?,
                                    ))
                                },
                            )?
                            .filter_map(|r| match r {
                                Ok((a, seq)) => Some(Ok((a?, seq?))),
                                Err(e) => Some(Err(e)),
                            })
                            .collect::<rusqlite::Result<Vec<_>>>()?;

                            // Get all the agents that have activity ready to be integrated.
                            let all_activity_agents: Vec<Arc<AgentPubKey>> = txn
                                .prepare_cached(
                                    holochain_sqlite::sql::sql_cell::ALL_ACTIVITY_AUTHORS,
                                )?
                                .query_map(
                                    named_params! {
                                        ":register_activity": DhtOpType::RegisterAgentActivity,
                                    },
                                    |row| Ok(Arc::new(row.get::<_, AgentPubKey>(0)?)),
                                )?
                                .collect::<rusqlite::Result<Vec<_>>>()?;

                            // Any agent activity that is currently ready to be integrated.
                            let mut any_ready_activity: HashMap<Arc<AgentPubKey>, ActivityState> =
                                HashMap::with_capacity(all_activity_agents.len());
                            let mut stmt = txn.prepare_cached(
                                holochain_sqlite::sql::sql_cell::ALL_READY_ACTIVITY,
                            )?;

                            // For each agent with activity that is ready to be integrated gather all
                            // the chain items and add them to the `awaiting_deps` list.
                            for author in all_activity_agents {
                                let awaiting_deps = stmt
                                    .query_map(
                                        named_params! {
                                            ":register_activity": DhtOpType::RegisterAgentActivity,
                                            ":author": author,
                                        },
                                        |row| row.get::<_, u32>(0),
                                    )?
                                    .collect::<rusqlite::Result<Vec<_>>>()?;
                                let state = ActivityState {
                                    awaiting_deps,
                                    ..Default::default()
                                };
                                any_ready_activity.insert(author, state);
                            }

                            DatabaseResult::Ok((activity_integrated, any_ready_activity))
                        })
                        .await?;

                    // Update the activity with the integrated sequence numbers.
                    for (agent, i) in activity_integrated {
                        let state = all_activity.entry(Arc::new(agent)).or_default();
                        state.bounds.integrated = Some(i);
                    }

                    // Now for each agent we update their activity so that any chain items
                    // that are ready to integrate are moved out of the `awaiting_deps` list.
                    for state in all_activity.values_mut() {
                        update_ready_to_integrate(state, None);
                    }

                    Ok(RwShare::new(all_activity))
                }
            })
            .await
    }

    /// Get any activity that is ready to be integrated.
    /// This returns a range of activity that is ready to be integrated
    /// for each agent.
    pub async fn get_activity_to_integrate(
        &self,
    ) -> DatabaseResult<Vec<(Arc<AgentPubKey>, RangeInclusive<u32>)>> {
        Ok(self.get_or_try_init().await?.share_ref(|activity| {
            activity
                .iter()
                .filter_map(|(agent, ActivityState { bounds, .. })| {
                    // If there is anything ready to integrated then it will be the end of the range.
                    let ready_to_integrate = bounds.ready_to_integrate?;

                    // The start of the range will be one more then the last integrated item or
                    // if there is nothing integrated then the start will be also the ready_to_integrate.
                    // This is why we use an inclusive range.
                    let start = bounds
                        .integrated
                        .and_then(|i| i.checked_add(1))
                        .filter(|i_prime| *i_prime <= ready_to_integrate)
                        .unwrap_or(ready_to_integrate);
                    Some((agent.clone(), start..=ready_to_integrate))
                })
                .collect()
        }))
    }

    /// Is the SourceChain empty for this [`AgentPubKey`]?
    pub async fn is_chain_empty(&self, author: &AgentPubKey) -> DatabaseResult<bool> {
        Ok(self.get_or_try_init().await?.share_ref(|activity| {
            activity
                .get(author)
                .map_or(true, |state| state.bounds.integrated.is_none())
        }))
    }

    /// Mark agent activity as actually integrated.
    pub async fn set_all_activity_to_integrated(
        &self,
        integrated_activity: Vec<(Arc<AgentPubKey>, RangeInclusive<u32>)>,
    ) -> DbCacheResult<()> {
        self.get_or_try_init().await?.share_mut(|activity| {
            let mut new_bounds = ActivityBounds::default();

            // For each authors activity run the activity check then update the activity state.
            for (author, seq_range) in integrated_activity {
                let prev_bounds = activity.get_mut(author.as_ref());

                // Set the new bounds to the start of this range for the check.
                new_bounds.integrated = Some(*seq_range.start());

                // Check that it makes sense to integrate the first activity in this range.
                if !update_activity_check(prev_bounds.as_deref().map(|p| &p.bounds), &new_bounds) {
                    return Err(DbCacheError::ActivityOutOfOrder(
                        prev_bounds.and_then(|p| p.integrated).unwrap_or(0),
                        new_bounds.integrated.unwrap_or(0),
                    ));
                }

                // Because ranges are sequential we know by induction that the last activity makes sense to add.
                // Update the bounds to the end of this range.
                new_bounds.integrated = Some(*seq_range.end());

                // If there is previous bounds then update the bounds.
                match prev_bounds {
                    Some(prev_bounds) => update_activity_inner(prev_bounds, &new_bounds),
                    None => {
                        // Otherwise insert the new state.
                        activity.insert(
                            author,
                            ActivityState {
                                bounds: new_bounds,
                                ..Default::default()
                            },
                        );
                    }
                }
            }
            Ok(())
        })
    }

    /// Set activity to ready to integrate.
    pub async fn set_activity_ready_to_integrate(
        &self,
        agent: &AgentPubKey,
        action_sequence: u32,
    ) -> DbCacheResult<()> {
        self.new_activity_inner(
            agent,
            ActivityBounds {
                ready_to_integrate: Some(action_sequence),
                ..Default::default()
            },
        )
        .await
    }

    /// Set activity to to integrated.
    pub async fn set_activity_to_integrated(
        &self,
        agent: &AgentPubKey,
        action_sequence: u32,
    ) -> DbCacheResult<()> {
        self.new_activity_inner(
            agent,
            ActivityBounds {
                integrated: Some(action_sequence),
                ..Default::default()
            },
        )
        .await
    }

    /// Add an authors activity.
    async fn new_activity_inner(
        &self,
        agent: &AgentPubKey,
        new_bounds: ActivityBounds,
    ) -> DbCacheResult<()> {
        self.get_or_try_init()
            .await?
            .share_mut(|activity| update_activity(activity, agent, &new_bounds))
    }
}

/// Check activity bounds can be added.
fn update_activity_check(
    prev_bounds: Option<&ActivityBounds>,
    new_bounds: &ActivityBounds,
) -> bool {
    prev_is_empty_new_is_zero(prev_bounds, new_bounds)
        && integrated_is_consecutive(prev_bounds, new_bounds)
}

/// Prev integrated is empty and new integrated is empty or set to zero
fn prev_is_empty_new_is_zero(
    prev_bounds: Option<&ActivityBounds>,
    new_bounds: &ActivityBounds,
) -> bool {
    prev_bounds.map_or(false, |p| p.integrated.is_some())
        || new_bounds.integrated.map_or(true, |i| i == 0)
}

/// If there's already activity marked integrated
/// then only the same or + 1 sequence number can be integrated.
fn integrated_is_consecutive(
    prev_bounds: Option<&ActivityBounds>,
    new_bounds: &ActivityBounds,
) -> bool {
    prev_bounds
        .and_then(|p| p.integrated)
        .zip(new_bounds.integrated)
        .map_or(true, |(p, n)| {
            (p == n) || p.checked_add(1).map(|p1| n == p1).unwrap_or(false)
        })
}

/// Updates the activity state of an author with new bounds.
fn update_activity(
    activity: &mut HashMap<Arc<AgentPubKey>, ActivityState>,
    agent: &AgentPubKey,
    new_bounds: &ActivityBounds,
) -> DbCacheResult<()> {
    let prev_state = activity.get_mut(agent);
    if !update_activity_check(prev_state.as_deref().map(|s| &s.bounds), new_bounds) {
        return Err(DbCacheError::ActivityOutOfOrder(
            prev_state.and_then(|p| p.integrated).unwrap_or(0),
            new_bounds.integrated.unwrap_or(0),
        ));
    }
    match prev_state {
        Some(prev_bounds) => update_activity_inner(prev_bounds, new_bounds),
        None => {
            // If the new bounds have `ready_to_integrate` and do not equal zero
            // then they are awaiting dependencies.
            if new_bounds.ready_to_integrate.map_or(false, |i| i != 0) {
                activity.insert(
                    Arc::new(agent.clone()),
                    ActivityState {
                        bounds: ActivityBounds {
                            integrated: new_bounds.integrated,
                            ..Default::default()
                        },
                        awaiting_deps: new_bounds.ready_to_integrate.into_iter().collect(),
                    },
                );
            } else {
                activity.insert(
                    Arc::new(agent.clone()),
                    ActivityState {
                        bounds: *new_bounds,
                        ..Default::default()
                    },
                );
            }
        }
    }
    DbCacheResult::Ok(())
}

fn update_activity_inner(prev_state: &mut ActivityState, new_bounds: &ActivityBounds) {
    if new_bounds.integrated.is_some() {
        prev_state.bounds.integrated = new_bounds.integrated;
    }
    update_ready_to_integrate(prev_state, new_bounds.ready_to_integrate);
}

/// Updates the ready to integrate state of an activity.
/// This function is a bit complex but is heavily tested and maintains the
/// chain activity can only be set to ready if it makes sense to.
fn update_ready_to_integrate(prev_state: &mut ActivityState, new_ready: Option<u32>) {
    // There is a new chain item that is ready for integration.
    if let Some(new_ready) = new_ready {
        match prev_state {
            // Nothing is integrated or currently ready to integrate but there could
            // be other chain items that are awaiting dependencies.
            ActivityState {
                bounds:
                    ActivityBounds {
                        integrated: None,
                        ready_to_integrate: ready @ None,
                    },
                awaiting_deps,
            } => {
                // (0) -> Ready(0)
                //
                // If we have no state and new_ready is zero
                // then the new ready_to_integrate is set to zero.
                if new_ready == 0 {
                    *ready = Some(new_ready);
                // (x) -> Out(x) where x > 0
                //
                // If new_ready is not zero then it is added to awaiting_deps.
                } else {
                    awaiting_deps.push(new_ready);
                    awaiting_deps.sort_unstable();
                }
            }
            // There is existing chain items that are ready to integrate.
            ActivityState {
                bounds:
                    ActivityBounds {
                        integrated: _,
                        ready_to_integrate: Some(x),
                    },
                awaiting_deps,
            } => {
                // (Ready(x), x') -> Ready(x')
                //
                // If ready_to_integrate + 1 == new_ready then we know this
                // new ready is consecutive from the previous ready_to_integrate.
                if x.checked_add(1)
                    .map_or(false, |x_prime| x_prime == new_ready)
                {
                    let check_awaiting_deps =
                        |x_prime_prime| awaiting_deps.first().map(|first| x_prime_prime == *first);
                    // (Ready(x), Out(x''..=y), x') -> Ready(y)
                    // (Ready(x), Out(x''..=y, z..), x') -> (Ready(y), Out(z..))
                    //
                    // If new_ready fills the gap between ready_to_integrate and the
                    // first sequence in awaiting_deps then we make the end of the sequence
                    // the new read_to_integrate.
                    if x.checked_add(2)
                        .and_then(check_awaiting_deps)
                        .unwrap_or(false)
                    {
                        if let Some(y) = find_consecutive(awaiting_deps) {
                            *x = y;
                        }
                    } else {
                        *x = new_ready;
                    }
                } else {
                    // The new ready chain item is not consecutive from the current
                    // ready so we add it to awaiting_deps.
                    awaiting_deps.push(new_ready);
                    awaiting_deps.sort_unstable();
                }
            }
            // There is an existing chain item that is integrated but
            // no currently ready to integrate.
            ActivityState {
                bounds:
                    ActivityBounds {
                        integrated: Some(x),
                        ready_to_integrate: ready @ None,
                    },
                awaiting_deps,
            } => {
                // (Integrated(x), x') -> (Integrated(x), Ready(x'))
                //
                // If the new ready is consecutive from the integrated then we
                // can set the new ready_to_integrate to the new ready.
                if x.checked_add(1)
                    .map_or(false, |x_prime| x_prime == new_ready)
                {
                    *ready = Some(new_ready);
                // (Integrated(x), a) -> (Integrated(x), Out(y)) where a != 'x
                //
                // The new ready is not consecutive from the integrated so we add
                // it to awaiting_deps.
                } else {
                    awaiting_deps.push(new_ready);
                    awaiting_deps.sort_unstable();
                }
            }
        }
    }
    // Now we have updated the ready_to_integrate and awaiting_deps if
    // there was a new_ready we can check if there is now a new consecutive
    // sequence.
    match prev_state {
        // Check if there is a consecutive sequence from ready_to_integrate to awaiting_deps.
        ActivityState {
            bounds:
                ActivityBounds {
                    ready_to_integrate: Some(x),
                    ..
                },
            awaiting_deps,
        } => {
            if x.checked_add(1)
                .and_then(|x_prime| awaiting_deps.first().map(|first| x_prime == *first))
                .unwrap_or(false)
            {
                if let Some(y) = find_consecutive(awaiting_deps) {
                    *x = y;
                }
            }
        }
        // If there is no ready_to_integrate then
        // check if there is a consecutive sequence from integrated to awaiting_deps.
        ActivityState {
            bounds:
                ActivityBounds {
                    integrated: Some(x),
                    ready_to_integrate: ready @ None,
                },
            awaiting_deps,
        } => {
            if x.checked_add(1)
                .and_then(|x_prime| awaiting_deps.first().map(|first| x_prime == *first))
                .unwrap_or(false)
            {
                if let Some(y) = find_consecutive(awaiting_deps) {
                    *ready = Some(y);
                }
            }
        }
        // Check if there is a zero in the awaiting deps.
        // This should not happen but is here for robustness.
        ActivityState {
            bounds:
                ActivityBounds {
                    integrated: None,
                    ready_to_integrate: ready @ None,
                },
            awaiting_deps,
        } => {
            if awaiting_deps.first().map_or(false, |first| *first == 0) {
                if let Some(y) = find_consecutive(awaiting_deps) {
                    *ready = Some(y);
                }
            }
        }
    }

    // Now the ready_to_integrate and awaiting_deps are updated if
    // the integrated is the same as the read_to_integrate then that
    // chain item was integrated so there is no longer a ready_to_integrate.
    if prev_state
        .bounds
        .integrated
        .and_then(|i| prev_state.ready_to_integrate.map(|r| i == r))
        .unwrap_or(false)
    {
        prev_state.bounds.ready_to_integrate = None;
    }
}

// Out(x..y) -> (y)
// Out(x..y, z..)) -> (Out(z..), y)
//
// Take the awaiting dependencies and if there's a sequence from the start
// then remove it and return the end of the sequence.
fn find_consecutive(awaiting_deps: &mut Vec<u32>) -> Option<u32> {
    if awaiting_deps.len() == 1 {
        awaiting_deps.pop()
    } else {
        let last_consecutive_pos = awaiting_deps
            .iter()
            .zip(awaiting_deps.iter().skip(1))
            .position(|(n, delta)| {
                n.checked_add(1)
                    .map(|n_prime| n_prime != *delta)
                    .unwrap_or(true)
            });
        match last_consecutive_pos {
            Some(pos) => {
                let r = awaiting_deps.get(pos).copied();
                // Drop the consecutive seqs.
                drop(awaiting_deps.drain(..=pos));
                awaiting_deps.shrink_to_fit();
                r
            }
            None => {
                let r = awaiting_deps.pop();
                awaiting_deps.clear();
                r
            }
        }
    }
}

#[cfg(test)]
impl ActivityState {
    fn new() -> Self {
        Self::default()
    }
    fn integrated(mut self, i: u32) -> Self {
        self.bounds.integrated = Some(i);
        self
    }
    fn ready(mut self, i: u32) -> Self {
        self.bounds.ready_to_integrate = Some(i);
        self
    }
    fn awaiting(mut self, i: Vec<u32>) -> Self {
        self.awaiting_deps = i;
        self
    }
}

impl From<DbRead<DbKindDht>> for DhtDbQueryCache {
    fn from(db: DbRead<DbKindDht>) -> Self {
        Self::new(db)
    }
}

impl From<DbWrite<DbKindDht>> for DhtDbQueryCache {
    fn from(db: DbWrite<DbKindDht>) -> Self {
        Self::new(db.into())
    }
}