1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
#![allow(unused_imports)]
use error_chain::bail;
use tracing::{debug, error, info, instrument, span, trace, warn, Level};
use tracing_futures::Instrument;

use btreemultimap::BTreeMultiMap;
use multimap::MultiMap;
use tokio::sync::broadcast;

use crate::compact::*;
use crate::conf::*;
use crate::error::*;
use crate::index::*;
use crate::transaction::*;

use fxhash::FxHashSet;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::RwLock as StdRwLock;
use tokio::sync::RwLock;

use crate::engine::TaskEngine;
use crate::event::EventHeader;
use crate::loader::*;
use crate::pipe::*;
use crate::redo::*;
use crate::spec::SerializationFormat;
use crate::spec::TrustMode;
use crate::time::TimeKeeper;
use crate::trust::*;

use crate::trust::ChainKey;

use super::inbox_pipe::*;
use super::workers::ChainWorkProcessor;
use super::*;

impl<'a> Chain {
    #[allow(dead_code)]
    pub(crate) async fn new(
        builder: ChainBuilder,
        key: &ChainKey,
        load_integrity: TrustMode,
        idle_integrity: TrustMode,
    ) -> Result<Chain, ChainCreationError> {
        Chain::new_ext(
            builder,
            key.clone(),
            None,
            true,
            load_integrity,
            idle_integrity,
        )
        .await
    }

    #[allow(dead_code)]
    pub async fn new_ext(
        builder: ChainBuilder,
        key: ChainKey,
        extra_loader: Option<Box<dyn Loader>>,
        allow_process_errors: bool,
        load_integrity: TrustMode,
        idle_integrity: TrustMode,
    ) -> Result<Chain, ChainCreationError> {
        debug!("open: {}", key);

        // Compute the open flags
        #[cfg(feature = "enable_local_fs")]
        let flags = OpenFlags {
            truncate: builder.truncate,
            temporal: builder.temporal,
            integrity: load_integrity,
            read_only: false,
        };
        let compact_mode = builder.cfg_ate.compact_mode;
        let compact_bootstrap = builder.cfg_ate.compact_bootstrap;

        // Create a redo log loader which will listen to all the events as they are
        // streamed in and extract the event headers
        #[cfg(feature = "enable_local_fs")]
        let (loader, mut rx) = RedoLogLoader::new();

        // We create a composite loader that includes any user defined loader
        let mut composite_loader = Box::new(crate::loader::CompositionLoader::default());
        #[cfg(feature = "enable_local_fs")]
        composite_loader.loaders.push(loader);
        if let Some(a) = extra_loader {
            composite_loader.loaders.push(a);
        }

        // Build the header
        let header = ChainHeader::default();
        let header_bytes = SerializationFormat::Json.serialize(&header)?;

        // Create the redo log itself which will open the files and stream in the events
        // in a background thread
        #[cfg(feature = "enable_local_fs")]
        let redo_log = {
            let key = key.clone();
            let builder = builder.clone();
            async move {
                RedoLog::open_ext(
                    &builder.cfg_ate,
                    &key,
                    flags,
                    composite_loader,
                    header_bytes,
                )
                .await
            }
        };
        #[cfg(not(feature = "enable_local_fs"))]
        let redo_log = { async move { RedoLog::open(header_bytes).await } };

        // While the events are streamed in we build a list of all the event headers
        // but we strip off the data itself
        let process_local = async move {
            #[allow(unused_mut)]
            let mut headers = Vec::new();
            #[cfg(feature = "enable_local_fs")]
            while let Some(result) = rx.recv().await {
                headers.push(result.header.as_header()?);
            }
            Result::<Vec<EventHeader>, SerializationError>::Ok(headers)
        };

        // Join the redo log thread earlier after the events were successfully streamed in
        let (redo_log, process_local) = futures::join!(redo_log, process_local);
        let headers = process_local?;
        let redo_log = redo_log?;

        // Construnct the chain-of-trust on top of the redo-log
        let chain = ChainOfTrust {
            debug_id: fastrand::u64(..),
            key: key.clone(),
            redo: redo_log,
            timeline: ChainTimeline {
                history: BTreeMultiMap::new(),
                pointers: BinaryTreeIndexer::default(),
                compactors: builder.compactors,
            },
            metrics: Arc::clone(&builder.metrics),
        };

        // Construct all the protected fields that are behind a synchronous critical section
        // that does not wait
        let mut inside_sync = ChainProtectedSync {
            sniffers: Vec::new(),
            services: Vec::new(),
            indexers: builder.indexers,
            plugins: builder.plugins,
            linters: builder.linters,
            validators: builder.validators,
            transformers: builder.transformers,
            default_session: builder.session,
            integrity: load_integrity,
        };

        // Add a tree authority plug if one is in the builder
        if let Some(tree) = builder.tree {
            inside_sync.plugins.push(Box::new(tree));
        }

        // Set the integrity mode on all the validators
        inside_sync.set_integrity_mode(load_integrity);

        // Wrap the sync object
        let inside_sync = Arc::new(StdRwLock::new(inside_sync));

        // Create an exit watcher
        let (exit_tx, _) = broadcast::channel(1);

        // The asynchronous critical section protects the chain-of-trust itself and
        // will have longer waits on it when there are writes occuring
        let mut inside_async = ChainProtectedAsync {
            chain,
            default_format: builder.cfg_ate.log_format,
            disable_new_roots: false,
            sync_tolerance: builder.cfg_ate.sync_tolerance,
            listeners: MultiMap::new(),
            is_shutdown: false,
            integrity: load_integrity,
        };

        // Check all the process events
        #[cfg(feature = "enable_verbose")]
        for a in headers.iter() {
            match a.meta.get_data_key() {
                Some(key) => debug!("loaded: {} data {}", a.raw.event_hash, key),
                None => debug!("loaded: {}", a.raw.event_hash),
            }
        }

        // Process all the events in the chain-of-trust
        let conversation = Arc::new(ConversationSession::default());
        if let Err(err) =
            inside_async.process(inside_sync.write().unwrap(), headers, Some(&conversation))
        {
            if allow_process_errors == false {
                return Err(err);
            }
        }

        // Now switch to the integrity mode we will use after loading
        inside_sync
            .write()
            .unwrap()
            .set_integrity_mode(idle_integrity);

        // Create the compaction state (which later we will pass to the compaction thread)
        let (compact_tx, compact_rx) =
            CompactState::new(compact_mode, inside_async.chain.redo.size() as u64);

        // Make the inside async immutable
        let inside_async = Arc::new(RwLock::new(inside_async));

        // The worker thread processes events that come in
        let worker_inside_async = Arc::clone(&inside_async);
        let worker_inside_sync = Arc::clone(&inside_sync);

        // background thread - receives events and processes them
        let sender = ChainWorkProcessor::new(worker_inside_async, worker_inside_sync, compact_tx);

        // decache subscription
        let (decache_tx, _) = broadcast::channel(1000);

        // The inbox pipe intercepts requests to and processes them
        let mut pipe: Arc<Box<dyn EventPipe>> = Arc::new(Box::new(InboxPipe {
            inbox: sender,
            decache: decache_tx.clone(),
            locks: StdMutex::new(FxHashSet::default()),
        }));
        if let Some(second) = builder.pipes {
            pipe = Arc::new(Box::new(DuelPipe::new(second, pipe)));
        };

        // Create the NTP worker thats needed to build the timeline
        let tolerance = builder.configured_for.ntp_tolerance();
        let time = Arc::new(TimeKeeper::new(&builder.cfg_ate, tolerance).await?);

        // Create the chain that will be returned to the caller
        let chain = Chain {
            key: key.clone(),
            node_id: builder.node_id.clone(),
            cfg_ate: builder.cfg_ate.clone(),
            remote_addr: None,
            default_format: builder.cfg_ate.log_format,
            inside_sync,
            inside_async,
            pipe,
            time,
            exit: exit_tx.clone(),
            decache: decache_tx,
            metrics: Arc::clone(&builder.metrics),
            throttle: Arc::clone(&builder.throttle),
        };

        // If we are to compact the log on bootstrap then do so
        debug!("compact-now: {}", compact_bootstrap);
        if compact_bootstrap {
            chain.compact().await?;
        }

        // Start the compactor worker thread on the chain
        if builder.cfg_ate.compact_mode != CompactMode::Never {
            debug!("compact-mode-on: {}", builder.cfg_ate.compact_mode);

            let worker_exit = exit_tx.subscribe();
            let worker_inside_async = Arc::clone(&chain.inside_async);
            let worker_inside_sync = Arc::clone(&chain.inside_sync);
            let worker_pipe = Arc::clone(&chain.pipe);
            let time = Arc::clone(&chain.time);

            // background thread - periodically compacts the chain into a smaller memory footprint
            TaskEngine::spawn(Chain::worker_compactor(
                worker_inside_async,
                worker_inside_sync,
                worker_pipe,
                time,
                compact_rx,
                worker_exit,
            ));
        } else {
            debug!("compact-mode-off: {}", builder.cfg_ate.compact_mode);
        }

        // Create the chain
        Ok(chain)
    }
}