1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
use tokio::sync::RwLock; use parking_lot::RwLock as StdRwLock; #[allow(unused_imports)] use std::sync::mpsc as smpsc; #[allow(unused_imports)] use std::sync::{Weak, Arc}; use crate::session::{Session}; use super::meta::*; use super::error::*; use super::chain::*; use super::pipe::*; use super::trust::*; use super::header::*; #[allow(unused_imports)] use super::event::*; use super::lint::*; use super::spec::*; use super::index::*; use bytes::Bytes; #[derive(Clone)] pub struct ChainMultiUser where Self: Send + Sync { pub(super) inside_async: Arc<RwLock<ChainProtectedAsync>>, pub(super) inside_sync: Arc<StdRwLock<ChainProtectedSync>>, pub(super) pipe: Arc<dyn EventPipe>, pub(super) default_format: MessageFormat, } impl ChainMultiUser { pub(crate) async fn new(accessor: &Chain) -> ChainMultiUser { ChainMultiUser { inside_async: Arc::clone(&accessor.inside_async), inside_sync: Arc::clone(&accessor.inside_sync), pipe: Arc::clone(&accessor.pipe), default_format: accessor.default_format, } } #[allow(dead_code)] pub async fn load(&self, leaf: EventLeaf) -> Result<LoadResult, LoadError> { self.inside_async.read().await.chain.load(leaf).await } #[allow(dead_code)] pub async fn load_many(&self, leafs: Vec<EventLeaf>) -> Result<Vec<LoadResult>, LoadError> { self.inside_async.read().await.chain.load_many(leafs).await } #[allow(dead_code)] pub async fn lookup_primary(&self, key: &PrimaryKey) -> Option<EventLeaf> { self.inside_async.read().await.chain.lookup_primary(key) } #[allow(dead_code)] pub async fn lookup_secondary(&self, key: &MetaCollection) -> Option<Vec<EventLeaf>> { self.inside_async.read().await.chain.lookup_secondary(key) } #[allow(dead_code)] pub(crate) fn metadata_lint_many<'a>(&self, lints: &Vec<LintData<'a>>, session: &Session) -> Result<Vec<CoreMetadata>, LintError> { let guard = self.inside_sync.read(); let mut ret = Vec::new(); for linter in guard.linters.iter() { ret.extend(linter.metadata_lint_many(lints, session)?); } for plugin in guard.plugins.iter() { ret.extend(plugin.metadata_lint_many(lints, session)?); } Ok(ret) } #[allow(dead_code)] pub(crate) fn metadata_lint_event(&self, meta: &mut Metadata, session: &Session) -> Result<Vec<CoreMetadata>, LintError> { let guard = self.inside_sync.read(); let mut ret = Vec::new(); for linter in guard.linters.iter() { ret.extend(linter.metadata_lint_event(meta, session)?); } for plugin in guard.plugins.iter() { ret.extend(plugin.metadata_lint_event(meta, session)?); } Ok(ret) } #[allow(dead_code)] pub(crate) fn data_as_overlay(&self, meta: &Metadata, data: Bytes, session: &Session) -> Result<Bytes, TransformError> { let guard = self.inside_sync.read(); let mut ret = data; for plugin in guard.plugins.iter().rev() { ret = plugin.data_as_overlay(meta, ret, session)?; } for transformer in guard.transformers.iter().rev() { ret = transformer.data_as_overlay(meta, ret, session)?; } Ok(ret) } #[allow(dead_code)] pub(crate) fn data_as_underlay(&self, meta: &mut Metadata, data: Bytes, session: &Session) -> Result<Bytes, TransformError> { let guard = self.inside_sync.read(); let mut ret = data; for transformer in guard.transformers.iter() { ret = transformer.data_as_underlay(meta, ret, session)?; } for plugin in guard.plugins.iter() { ret = plugin.data_as_underlay(meta, ret, session)?; } Ok(ret) } #[allow(dead_code)] pub async fn count(&self) -> usize { self.inside_async.read().await.chain.redo.count() } }