kitsune2_showcase 0.5.0-dev.1

kitsune2 p2p / dht showcase app
use super::file_data::FileData;
use bytes::Bytes;
use kitsune2_api::{
    BoxFut, Builder, Config, DhtArc, DynOpStore, DynOpStoreFactory, K2Error,
    K2Result, MetaOp, OpId, OpStore, OpStoreFactory, SpaceId, Timestamp,
};
use kitsune2_core::factories::{MemOpStoreFactory, MemoryOpRecord};
use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
};

pub type FileStoreLookup = Arc<Mutex<HashMap<String, OpId>>>;

#[derive(Debug)]
pub struct FileOpStoreFactory {
    mem_op_store_factory: Arc<MemOpStoreFactory>,
    file_store_lookup: FileStoreLookup,
}

impl FileOpStoreFactory {
    pub fn create(file_store_lookup: FileStoreLookup) -> DynOpStoreFactory {
        let out: DynOpStoreFactory = Arc::new(FileOpStoreFactory {
            mem_op_store_factory: Arc::new(MemOpStoreFactory {}),
            file_store_lookup,
        });
        out
    }
}

impl OpStoreFactory for FileOpStoreFactory {
    fn default_config(&self, config: &mut Config) -> K2Result<()> {
        self.mem_op_store_factory.default_config(config)
    }

    fn validate_config(&self, config: &Config) -> K2Result<()> {
        self.mem_op_store_factory.validate_config(config)
    }

    fn create(
        &self,
        builder: Arc<Builder>,
        space_id: SpaceId,
    ) -> BoxFut<'static, K2Result<DynOpStore>> {
        let mem_op_store_factory = self.mem_op_store_factory.clone();
        let file_store_lookup = self.file_store_lookup.clone();
        Box::pin(async move {
            let out: DynOpStore = Arc::new(FileOpStore {
                mem_op_store: mem_op_store_factory
                    .create(builder, space_id)
                    .await?,
                file_store_lookup,
            });
            Ok(out)
        })
    }
}

#[derive(Debug)]
struct FileOpStore {
    mem_op_store: DynOpStore,
    file_store_lookup: FileStoreLookup,
}

impl OpStore for FileOpStore {
    fn process_incoming_ops(
        &self,
        op_list: Vec<Bytes>,
    ) -> BoxFut<'_, K2Result<Vec<OpId>>> {
        Box::pin(async move {
            // Get a list of file names from the provided ops
            let file_names = op_list
                .iter()
                .map(|op| {
                    let mem_op = MemoryOpRecord::from(op.clone());
                     serde_json::from_slice::<FileData>(&mem_op.op_data)
                        .map(|f| f.name)
                })
                .collect::<Result<Vec<_>, _>>().map_err(|e| {
                K2Error::other_src("Failed to deserialize op data, are you using Kitsune2's `MemoryOp`?", e)
            })?;

            // Process the ops and add them the to in-memory op store,
            // returning the computed IDs of the passed ops
            let op_ids =
                self.mem_op_store.process_incoming_ops(op_list).await?;

            // Add the file names and their corresponding op IDs to the lookup
            self.file_store_lookup
                .lock()
                .expect("failed to get lock for file_store_lookup")
                .extend(file_names.into_iter().zip(op_ids.clone()));

            Ok(op_ids)
        })
    }

    fn retrieve_op_hashes_in_time_slice(
        &self,
        arc: DhtArc,
        start: Timestamp,
        end: Timestamp,
    ) -> BoxFut<'_, K2Result<(Vec<OpId>, u32)>> {
        self.mem_op_store
            .retrieve_op_hashes_in_time_slice(arc, start, end)
    }

    fn retrieve_ops(
        &self,
        op_ids: Vec<OpId>,
    ) -> BoxFut<'_, K2Result<Vec<MetaOp>>> {
        self.mem_op_store.retrieve_ops(op_ids)
    }

    fn filter_out_existing_ops(
        &self,
        op_ids: Vec<OpId>,
    ) -> BoxFut<'_, K2Result<Vec<OpId>>> {
        self.mem_op_store.filter_out_existing_ops(op_ids)
    }

    fn retrieve_op_ids_bounded(
        &self,
        arc: DhtArc,
        start: Timestamp,
        limit_bytes: u32,
    ) -> BoxFut<'_, K2Result<(Vec<OpId>, u32, Timestamp)>> {
        self.mem_op_store
            .retrieve_op_ids_bounded(arc, start, limit_bytes)
    }

    fn earliest_timestamp_in_arc(
        &self,
        arc: DhtArc,
    ) -> BoxFut<'_, K2Result<Option<Timestamp>>> {
        self.mem_op_store.earliest_timestamp_in_arc(arc)
    }

    fn store_slice_hash(
        &self,
        arc: DhtArc,
        slice_index: u64,
        slice_hash: Bytes,
    ) -> BoxFut<'_, K2Result<()>> {
        self.mem_op_store
            .store_slice_hash(arc, slice_index, slice_hash)
    }

    fn slice_hash_count(&self, arc: DhtArc) -> BoxFut<'_, K2Result<u64>> {
        self.mem_op_store.slice_hash_count(arc)
    }

    fn retrieve_slice_hash(
        &self,
        arc: DhtArc,
        slice_index: u64,
    ) -> BoxFut<'_, K2Result<Option<Bytes>>> {
        self.mem_op_store.retrieve_slice_hash(arc, slice_index)
    }

    fn retrieve_slice_hashes(
        &self,
        arc: DhtArc,
    ) -> BoxFut<'_, K2Result<Vec<(u64, Bytes)>>> {
        self.mem_op_store.retrieve_slice_hashes(arc)
    }

    fn query_total_op_count(&self) -> BoxFut<'_, K2Result<u64>> {
        self.mem_op_store.query_total_op_count()
    }
}