1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use tokio::sync::RwLock;
use parking_lot::RwLock as StdRwLock;
#[allow(unused_imports)]
use std::sync::mpsc as smpsc;
#[allow(unused_imports)]
use std::sync::{Weak, Arc};

use crate::session::{Session};

use super::meta::*;
use super::error::*;
use super::chain::*;
use super::pipe::*;
use super::trust::*;
use super::header::*;
#[allow(unused_imports)]
use super::event::*;
use super::lint::*;
use super::spec::*;
use super::index::*;

use bytes::Bytes;

#[derive(Clone)]
pub struct ChainMultiUser
where Self: Send + Sync
{
    pub(super) inside_async: Arc<RwLock<ChainProtectedAsync>>,
    pub(super) inside_sync: Arc<StdRwLock<ChainProtectedSync>>,
    pub(super) pipe: Arc<dyn EventPipe>,
    pub(super) default_format: MessageFormat,
}

impl ChainMultiUser
{
    pub(crate) async fn new(accessor: &Chain) -> ChainMultiUser
    {
        ChainMultiUser {
            inside_async: Arc::clone(&accessor.inside_async),
            inside_sync: Arc::clone(&accessor.inside_sync),
            pipe: Arc::clone(&accessor.pipe),
            default_format: accessor.default_format,
        }
    }
 
    #[allow(dead_code)]
    pub async fn load(&self, leaf: EventLeaf) -> Result<LoadResult, LoadError> {
        self.inside_async.read().await.chain.load(leaf).await
    }

    #[allow(dead_code)]
    pub async fn load_many(&self, leafs: Vec<EventLeaf>) -> Result<Vec<LoadResult>, LoadError> {
        self.inside_async.read().await.chain.load_many(leafs).await
    }

    #[allow(dead_code)]
    pub async fn lookup_primary(&self, key: &PrimaryKey) -> Option<EventLeaf> {
        self.inside_async.read().await.chain.lookup_primary(key)
    }

    #[allow(dead_code)]
    pub async fn lookup_secondary(&self, key: &MetaCollection) -> Option<Vec<EventLeaf>> {
        self.inside_async.read().await.chain.lookup_secondary(key)
    }

    #[allow(dead_code)]
    pub(crate) fn metadata_lint_many<'a>(&self, lints: &Vec<LintData<'a>>, session: &Session) -> Result<Vec<CoreMetadata>, LintError> {
        let guard = self.inside_sync.read();
        let mut ret = Vec::new();
        for linter in guard.linters.iter() {
            ret.extend(linter.metadata_lint_many(lints, session)?);
        }
        for plugin in guard.plugins.iter() {
            ret.extend(plugin.metadata_lint_many(lints, session)?);
        }
        Ok(ret)
    }

    #[allow(dead_code)]
    pub(crate) fn metadata_lint_event(&self, meta: &mut Metadata, session: &Session) -> Result<Vec<CoreMetadata>, LintError> {
        let guard = self.inside_sync.read();
        let mut ret = Vec::new();
        for linter in guard.linters.iter() {
            ret.extend(linter.metadata_lint_event(meta, session)?);
        }
        for plugin in guard.plugins.iter() {
            ret.extend(plugin.metadata_lint_event(meta, session)?);
        }
        Ok(ret)
    }

    #[allow(dead_code)]
    pub(crate) fn data_as_overlay(&self, meta: &Metadata, data: Bytes, session: &Session) -> Result<Bytes, TransformError> {
        let guard = self.inside_sync.read();
        let mut ret = data;
        for plugin in guard.plugins.iter().rev() {
            ret = plugin.data_as_overlay(meta, ret, session)?;
        }
        for transformer in guard.transformers.iter().rev() {
            ret = transformer.data_as_overlay(meta, ret, session)?;
        }
        Ok(ret)
    }

    #[allow(dead_code)]
    pub(crate) fn data_as_underlay(&self, meta: &mut Metadata, data: Bytes, session: &Session) -> Result<Bytes, TransformError> {
        let guard = self.inside_sync.read();
        let mut ret = data;
        for transformer in guard.transformers.iter() {
            ret = transformer.data_as_underlay(meta, ret, session)?;
        }
        for plugin in guard.plugins.iter() {
            ret = plugin.data_as_underlay(meta, ret, session)?;
        }
        Ok(ret)
    }
    
    #[allow(dead_code)]
    pub async fn count(&self) -> usize {
        self.inside_async.read().await.chain.redo.count()
    }
}