ingest 0.1.1

Single and multi-threaded custom ingestion crate for Stellar Futurenet, written in Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
use std::io::{self, Read};
use std::sync::mpsc::{SendError, Sender, SyncSender};
use std::sync::{Arc, Mutex};
use stellar_xdr::next::{LedgerCloseMeta, Limits, Type, TypeVariant};

/// prevents stack overflow
pub const DEFAULT_XDR_RW_DEPTH_LIMIT: u32 = 500;

// from the stellar/go/ingestion lib
const META_PIPE_BUFFER_SIZE: usize = 10 * 1024 * 1024;
const LEDGER_READ_AHEAD_BUFFER_SIZE: usize = 20;

/// Enum to represent different types of errors related to `BufReader` operations.
#[derive(thiserror::Error, Debug, Clone)]
pub enum BufReaderError {
    /// An unknown type was encountered. Choose from the provided list of valid types.
    #[error("Unknown type {0}, choose one of {1:?}")]
    UnknownType(String, &'static [&'static str]),

    /// Error encountered while decoding XDR data.
    #[error("Error decoding XDR")]
    ReadXdrNext,

    /// Attempted to run single-threaded mode with a specified transmitter, which is unused.
    #[error("Wants to run single-threaded mode but specified transmitter")]
    UnusedTransmitter,

    /// Attempted to run multi-threaded mode without specifying a transmitter.
    #[error("Wants to run multi-threaded mode but no transmitter specified")]
    MissingTransmitter,

    /// Attempted to use single-threaded mode features while in multi-threaded mode.
    #[error("Wants to use single-threaded mode features but is multi-thread mode")]
    WrongModeMultiThread,

    /// Attempted to use multi-threaded mode features while in single-threaded mode.
    #[error("Wants to use multi-threaded mode features but is single-thread mode")]
    WrongModeSingleThread,

    /// Cloned `BufReaders` must only be used for their associated thread mode.
    #[error("Cloned BufReaders must only be used for their thread mode")]
    UsedClonedBufreader,

    /// Error in sending to receiver.
    #[error("Error while sending meta to receiver {0}")]
    SendError(#[from] SendError<Box<MetaResult>>),

    #[error("Failed to aquire lock")]
    LockError,
}

/// Wrapper struct to hold the `LedgerCloseMeta` data.
#[derive(Clone, Debug)]
pub struct LedgerCloseMetaWrapper {
    /// The ledger close metadata associated with this wrapper.
    pub ledger_close_meta: LedgerCloseMeta,
}

impl LedgerCloseMetaWrapper {
    fn new(inner: LedgerCloseMeta) -> Self {
        Self {
            ledger_close_meta: inner,
        }
    }
}

impl From<Type> for LedgerCloseMetaWrapper {
    fn from(value: Type) -> Self {
        match value {
            Type::LedgerCloseMeta(boxed_ledger_close_meta) => Self::new(*boxed_ledger_close_meta),

            // As long as this code is used for this crate
            // this other match arms should be unreachable.
            //
            // Note: if you want to implement a similar
            // functionality make sure to asses if this
            // unreachable block should be used or not.
            _ => unreachable!(),
        }
    }
}

/// Represents the result of processing ledger metadata.
#[derive(Clone, Debug)]
pub struct MetaResult {
    /// The ledger close metadata associated with this result.
    pub ledger_close_meta: Option<LedgerCloseMetaWrapper>,

    /// An optional error encountered during processing.
    pub err: Option<BufReaderError>,
}

/// Enum to indicate the mode of operation for `BufferedLedgerMetaReader`.
#[derive(PartialEq, Eq, Clone)]
pub enum BufferedLedgerMetaReaderMode {
    /// The reader operates in single-thread mode.
    SingleThread,

    /// The reader operates in multi-thread mode.
    MultiThread,
}

/// Struct for reading buffered ledger metadata.
pub struct BufferedLedgerMetaReader {
    /// The mode of operation for the reader.
    mode: BufferedLedgerMetaReaderMode,

    /// An optional buffered reader for reading data.
    /// This value is set as an option to allow cloning
    /// a `BufferedLedgerMetaReader` for it to be used
    /// to retrieve the mode.
    reader: Option<io::BufReader<Box<dyn Read + Send>>>,

    /// An optional cached vector of metadata results.
    /// This will only be used when running offline.
    cached: Option<Arc<Mutex<Vec<MetaResult>>>>,

    /// An optional transmitter for sending metadata results.
    /// This will only be used when running online
    transmitter: Option<Sender<Box<MetaResult>>>,

    /// An optional sync transmitter.
    sync_transmitter: Option<SyncSender<Box<MetaResult>>>,

    async_transmitter: Option<tokio::sync::mpsc::UnboundedSender<Box<MetaResult>>>,
    async_transmitter_bounded: Option<tokio::sync::mpsc::Sender<Box<MetaResult>>>,

    /// Indicates whether the reader has been cloned.
    /// A cloned reader is just a lightweight placeholder
    /// reader which is only used to retrieve the mode.
    ///
    /// Cloned readers are only used in multi-thread mode.
    cloned: bool,
}

impl Clone for BufferedLedgerMetaReader {
    fn clone(&self) -> Self {
        Self {
            mode: self.mode.clone(),
            reader: None,
            cached: None,
            transmitter: None,
            sync_transmitter: None,
            async_transmitter: None,
            async_transmitter_bounded: None,
            cloned: true,
        }
    }
}

impl BufferedLedgerMetaReader {
    /// Creates a new `BufferedLedgerMetaReader` instance.
    ///
    /// # Arguments
    ///
    /// * `mode` - The mode of operation for the reader.
    /// * `reader` - The boxed reader used for reading data.
    /// * `transmitter` - An optional transmitter for sending metadata results in multi-thread mode.
    ///
    /// # Returns
    ///
    /// Returns a new `BufferedLedgerMetaReader` instance if successful, or a `BufReaderError` if an issue occurs.
    pub fn new(
        mode: BufferedLedgerMetaReaderMode,
        reader: Box<dyn Read + Send>,
        transmitter: Option<std::sync::mpsc::Sender<Box<MetaResult>>>,
        sync_transmitter: Option<SyncSender<Box<MetaResult>>>,
        async_transmitter: Option<tokio::sync::mpsc::UnboundedSender<Box<MetaResult>>>,
        async_transmitter_bounded: Option<tokio::sync::mpsc::Sender<Box<MetaResult>>>,
    ) -> Result<Self, BufReaderError> {
        let reader = io::BufReader::with_capacity(META_PIPE_BUFFER_SIZE, reader);

        // perform some safety checks and assing
        // chached.
        let cached = {
            let tx_is = transmitter.is_some();
            let sync_tx_is = sync_transmitter.is_some();

            // we make sure that we either use sync tx or tx.
            if tx_is && sync_tx_is {
                return Err(BufReaderError::MissingTransmitter);
            }

            match mode {
                BufferedLedgerMetaReaderMode::SingleThread => {
                    // we ensure that transmitters are not present in
                    // single-thread mode.
                    if tx_is || sync_tx_is {
                        return Err(BufReaderError::UnusedTransmitter);
                    }

                    Some(Arc::new(Mutex::new(Vec::with_capacity(
                        LEDGER_READ_AHEAD_BUFFER_SIZE,
                    ))))
                }

                BufferedLedgerMetaReaderMode::MultiThread => {
                    // make sure that at least one transmittor is some
                    // when running multi-thread mode.
                    if !tx_is && !sync_tx_is && !async_transmitter.is_some() {
                        return Err(BufReaderError::MissingTransmitter);
                    }
                    None
                }
            }
        };

        Ok(Self {
            mode,
            reader: Some(reader),
            cached,
            transmitter,
            sync_transmitter,
            async_transmitter,
            async_transmitter_bounded,
            cloned: false,
        })
    }

    /// Retrieves the thread mode of the `BufferedLedgerMetaReader`.
    ///
    /// # Returns
    ///
    /// Returns a reference to the `BufferedLedgerMetaReaderMode` indicating the current thread mode.
    pub fn thread_mode(&self) -> &BufferedLedgerMetaReaderMode {
        &self.mode
    }
}

/// Trait for reading ledger metadata in single-thread mode from a buffered source.
pub trait SingleThreadBufferedLedgerMetaReader {
    /// Reads ledger metadata from the buffered source in single-thread mode.
    ///
    /// # Returns
    ///
    /// Returns `Ok(())` if reading is successful, or a `BufReaderError` if an issue occurs.
    fn single_thread_read_ledger_meta_from_pipe(&mut self) -> Result<(), BufReaderError>;

    /// Reads and retrieves cached ledger metadata in single-thread mode.
    ///
    /// # Returns
    ///
    /// Returns a vector of `MetaResult` if retrieval is successful, or a `BufReaderError` if an issue occurs.
    fn read_meta(&self) -> Result<Vec<MetaResult>, BufReaderError>;

    /// Clears the cached buffered ledger metadata in single-thread mode.
    ///
    /// # Returns
    ///
    /// Returns `Ok(())` if clearing is successful, or a `BufReaderError` if an issue occurs.
    fn clear_buffered(&mut self) -> Result<(), BufReaderError>;
}

/// Trait for reading ledger metadata in multi-thread mode from a buffered source.
pub trait MultiThreadBufferedLedgerMetaReader {
    /// Reads ledger metadata from the buffered source in multi-thread mode.
    ///
    /// # Returns
    ///
    /// Returns `Ok(())` if reading is successful, or a `BufReaderError` if an issue occurs.
    fn multi_thread_read_ledger_meta_from_pipe(&mut self) -> Result<(), BufReaderError>;
}

impl SingleThreadBufferedLedgerMetaReader for BufferedLedgerMetaReader {
    fn single_thread_read_ledger_meta_from_pipe(&mut self) -> Result<(), BufReaderError> {
        if self.mode != BufferedLedgerMetaReaderMode::SingleThread {
            return Err(BufReaderError::WrongModeMultiThread);
        }

        if self.cloned {
            return Err(BufReaderError::UsedClonedBufreader);
        }

        let mut reader = self.reader.as_mut().unwrap();
        let mut xdr_reader =
            stellar_xdr::next::Limited::new(&mut reader, Limits::depth(DEFAULT_XDR_RW_DEPTH_LIMIT));
        for t in stellar_xdr::next::Type::read_xdr_framed_iter(
            TypeVariant::LedgerCloseMeta,
            &mut xdr_reader,
        ) {
            let meta_obj = match t {
                Ok(ledger_close_meta) => MetaResult {
                    ledger_close_meta: Some(ledger_close_meta.into()),
                    err: None,
                },

                Err(_) => MetaResult {
                    ledger_close_meta: None,
                    err: Some(BufReaderError::ReadXdrNext),
                },
            };

            // The below unwrap on cached is safe since initialization
            // prevents initializing in the wrong mode and all
            // BufferedLedgerMetaReader fields are private.
            self.cached
                .as_ref()
                .unwrap()
                .lock()
                .map_err(|_| BufReaderError::LockError)?
                .push(meta_obj);
        }

        Ok(())
    }

    fn read_meta(&self) -> Result<Vec<MetaResult>, BufReaderError> {
        if self.mode != BufferedLedgerMetaReaderMode::SingleThread {
            return Err(BufReaderError::WrongModeMultiThread);
        }

        if self.cloned {
            return Err(BufReaderError::UsedClonedBufreader);
        }

        // The below unwrap on cached is safe since initialization
        // prevents initializing in the wrong mode and all
        // BufferedLedgerMetaReader fields are private.
        let locked = self
            .cached
            .as_ref()
            .unwrap()
            .lock()
            .map_err(|_| BufReaderError::LockError)?;
        Ok((*locked).clone())
    }

    fn clear_buffered(&mut self) -> Result<(), BufReaderError> {
        if self.mode != BufferedLedgerMetaReaderMode::SingleThread {
            return Err(BufReaderError::WrongModeMultiThread);
        }

        if self.cloned {
            return Err(BufReaderError::UsedClonedBufreader);
        }

        self.cached = Some(Arc::new(Mutex::new(Vec::with_capacity(
            LEDGER_READ_AHEAD_BUFFER_SIZE,
        ))));
        Ok(())
    }
}

impl MultiThreadBufferedLedgerMetaReader for BufferedLedgerMetaReader {
    fn multi_thread_read_ledger_meta_from_pipe(&mut self) -> Result<(), BufReaderError> {
        if self.mode != BufferedLedgerMetaReaderMode::MultiThread {
            return Err(BufReaderError::WrongModeSingleThread);
        }

        if self.cloned {
            return Err(BufReaderError::UsedClonedBufreader);
        }

        let mut reader = self.reader.as_mut().unwrap();
        let mut xdr_reader =
            stellar_xdr::next::Limited::new(&mut reader, Limits::depth(DEFAULT_XDR_RW_DEPTH_LIMIT));
        for t in stellar_xdr::next::Type::read_xdr_framed_iter(
            TypeVariant::LedgerCloseMeta,
            &mut xdr_reader,
        ) {
            let meta_obj = match t {
                Ok(ledger_close_meta) => MetaResult {
                    ledger_close_meta: Some(ledger_close_meta.into()),
                    err: None,
                },

                Err(_) => MetaResult {
                    ledger_close_meta: None,
                    err: Some(BufReaderError::ReadXdrNext),
                },
            };

            if let Some(tx) = self.sync_transmitter.as_ref() {
                tx.send(Box::new(meta_obj))?
            } else {
                // The below unwrap on the transmitter is safe since
                // initialization prevents initializing in the wrong mode
                // and all BufferedLedgerMetaReader fields are private.

                self.transmitter
                    .as_ref()
                    .unwrap()
                    .send(Box::new(meta_obj))?
            }
        }

        Ok(())
    }
}

impl BufferedLedgerMetaReader {
    pub async fn async_multi_thread_read_ledger_meta_from_pipe(
        &mut self,
    ) -> Result<(), BufReaderError> {
        if self.mode != BufferedLedgerMetaReaderMode::MultiThread {
            return Err(BufReaderError::WrongModeSingleThread);
        }

        if self.cloned {
            return Err(BufReaderError::UsedClonedBufreader);
        }

        let mut reader = self.reader.as_mut().unwrap();
        let mut xdr_reader =
            stellar_xdr::next::Limited::new(&mut reader, Limits::depth(DEFAULT_XDR_RW_DEPTH_LIMIT));
        for t in stellar_xdr::next::Type::read_xdr_framed_iter(
            TypeVariant::LedgerCloseMeta,
            &mut xdr_reader,
        ) {
            println!("\n\n Inner got result");
            let meta_obj = match t {
                Ok(ledger_close_meta) => MetaResult {
                    ledger_close_meta: Some(ledger_close_meta.into()),
                    err: None,
                },

                Err(_) => MetaResult {
                    ledger_close_meta: None,
                    err: Some(BufReaderError::ReadXdrNext),
                },
            };

            if let Some(tx) = self.async_transmitter.as_ref() {
                let transmit = tx.send(Box::new(meta_obj));

                if transmit.is_err() {
                    log::error!(
                        "Failed to transmit ledger close: {:?}",
                        transmit.err().unwrap()
                    )
                }
            }
        }

        Ok(())
    }
}