repo-stream 0.4.0

Fast and robust atproto CAR file processing
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
//! Consume a CAR from an AsyncRead, producing an ordered stream of records

use crate::{
    Bytes, HashMap,
    disk::{DiskError, DiskStore},
    mst::MstNode,
    walk::Output,
};
use cid::Cid;
use iroh_car::CarReader;
use std::convert::Infallible;
use tokio::{io::AsyncRead, sync::mpsc};

use crate::mst::Commit;
use crate::walk::{WalkError, Walker};

/// Errors that can happen while consuming and emitting blocks and records
#[derive(Debug, thiserror::Error)]
pub enum DriveError {
    #[error("Error from iroh_car: {0}")]
    CarReader(#[from] iroh_car::Error),
    #[error("Failed to decode commit block: {0}")]
    BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
    #[error("The Commit block reference by the root was not found")]
    MissingCommit,
    #[error("Failed to walk the mst tree: {0}")]
    WalkError(#[from] WalkError),
    #[error("CAR file had no roots")]
    MissingRoot,
    #[error("Storage error")]
    StorageError(#[from] DiskError),
    #[error("Tried to send on a closed channel")]
    ChannelSendError, // SendError takes <T> which we don't need
    #[error("Failed to join a task: {0}")]
    JoinError(#[from] tokio::task::JoinError),
}

/// An in-order chunk of Rkey + CID + (processed) Block
pub type BlockChunk = Vec<Output>;

#[derive(Debug, Clone)]
pub(crate) enum MaybeProcessedBlock {
    /// A block that's *probably* a Node (but we can't know yet)
    ///
    /// It *can be* a record that suspiciously looks a lot like a node, so we
    /// cannot eagerly turn it into a Node. We only know for sure what it is
    /// when we actually walk down the MST
    Raw(Bytes),
    /// A processed record from a block that was definitely not a Node
    ///
    /// Processing has to be fallible because the CAR can have totally-unused
    /// blocks, which can just be garbage. since we're eagerly trying to process
    /// record blocks without knowing for sure that they *are* records, we
    /// discard any definitely-not-nodes that fail processing and keep their
    /// error in the buffer for them. if we later try to retreive them as a
    /// record, then we can surface the error.
    ///
    /// If we _never_ needed this block, then we may have wasted a bit of effort
    /// trying to process it. Oh well.
    ///
    /// There's an alternative here, which would be to kick unprocessable blocks
    /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
    /// surface the typed error later if needed by trying to reprocess.
    Processed(Bytes),
}

impl MaybeProcessedBlock {
    pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self {
        if MstNode::could_be(&data) {
            MaybeProcessedBlock::Raw(data)
        } else {
            MaybeProcessedBlock::Processed(process(data))
        }
    }
    pub(crate) fn len(&self) -> usize {
        match self {
            MaybeProcessedBlock::Raw(b) => b.len(),
            MaybeProcessedBlock::Processed(b) => b.len(),
        }
    }
    pub(crate) fn into_bytes(self) -> Bytes {
        match self {
            MaybeProcessedBlock::Raw(mut b) => {
                b.push(0x00);
                b
            }
            MaybeProcessedBlock::Processed(mut b) => {
                b.push(0x01);
                b
            }
        }
    }
    pub(crate) fn from_bytes(mut b: Bytes) -> Self {
        // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc
        let suffix = b.pop().unwrap();
        if suffix == 0x00 {
            MaybeProcessedBlock::Raw(b)
        } else {
            MaybeProcessedBlock::Processed(b)
        }
    }
}

/// Read a CAR file, buffering blocks in memory or to disk
pub enum Driver<R: AsyncRead + Unpin> {
    /// All blocks fit within the memory limit
    ///
    /// You probably want to check the commit's signature. You can go ahead and
    /// walk the MST right away.
    Memory(Commit, MemDriver),
    /// Blocks exceed the memory limit
    ///
    /// You'll need to provide a disk storage to continue. The commit will be
    /// returned and can be validated only once all blocks are loaded.
    Disk(NeedDisk<R>),
}

/// Processor that just returns the raw blocks
#[inline]
pub fn noop(block: Bytes) -> Bytes {
    block
}

/// Builder-style driver setup
#[derive(Debug, Clone)]
pub struct DriverBuilder {
    pub mem_limit_mb: usize,
    pub block_processor: fn(Bytes) -> Bytes,
}

impl Default for DriverBuilder {
    fn default() -> Self {
        Self {
            mem_limit_mb: 16,
            block_processor: noop,
        }
    }
}

impl DriverBuilder {
    /// Begin configuring the driver with defaults
    pub fn new() -> Self {
        Default::default()
    }
    /// Set the in-memory size limit, in MiB
    ///
    /// Default: 16 MiB
    pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
        self.mem_limit_mb = new_limit;
        self
    }

    /// Set the block processor
    ///
    /// Default: noop, raw blocks will be emitted
    pub fn with_block_processor(mut self, new_processor: fn(Bytes) -> Bytes) -> DriverBuilder {
        self.block_processor = new_processor;
        self
    }

    /// Begin processing an atproto MST from a CAR file
    pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> {
        Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
    }
}

impl<R: AsyncRead + Unpin> Driver<R> {
    /// Begin processing an atproto MST from a CAR file
    ///
    /// Blocks will be loaded, processed, and buffered in memory. If the entire
    /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory`
    /// will be returned along with a `Commit` ready for validation.
    ///
    /// If the `mem_limit_mb` limit is reached before loading all blocks, the
    /// partial state will be returned as `Driver::Disk(needed)`, which can be
    /// resumed by providing a `SqliteStorage` for on-disk block storage.
    pub async fn load_car(
        reader: R,
        process: fn(Bytes) -> Bytes,
        mem_limit_mb: usize,
    ) -> Result<Driver<R>, DriveError> {
        let max_size = mem_limit_mb * 2_usize.pow(20);
        let mut mem_blocks = HashMap::new();

        let mut car = CarReader::new(reader).await?;

        let root = *car
            .header()
            .roots()
            .first()
            .ok_or(DriveError::MissingRoot)?;
        log::debug!("root: {root:?}");

        let mut commit = None;

        // try to load all the blocks into memory
        let mut mem_size = 0;
        while let Some((cid, data)) = car.next_block().await? {
            // the root commit is a Special Third Kind of block that we need to make
            // sure not to optimistically send to the processing function
            if cid == root {
                let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
                commit = Some(c);
                continue;
            }

            // remaining possible types: node, record, other. optimistically process
            let maybe_processed = MaybeProcessedBlock::maybe(process, data);

            // stash (maybe processed) blocks in memory as long as we have room
            mem_size += maybe_processed.len();
            mem_blocks.insert(cid, maybe_processed);
            if mem_size >= max_size {
                return Ok(Driver::Disk(NeedDisk {
                    car,
                    root,
                    process,
                    max_size,
                    mem_blocks,
                    commit,
                }));
            }
        }

        // all blocks loaded and we fit in memory! hopefully we found the commit...
        let commit = commit.ok_or(DriveError::MissingCommit)?;

        // the commit always must point to a Node; empty node => empty MST special case
        let root_node: MstNode = match mem_blocks
            .get(&commit.data)
            .ok_or(DriveError::MissingCommit)?
        {
            MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?,
            MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?,
        };
        let walker = Walker::new(root_node);

        Ok(Driver::Memory(
            commit,
            MemDriver {
                blocks: mem_blocks,
                walker,
                process,
            },
        ))
    }
}

/// The core driver between the block stream and MST walker
///
/// In the future, PDSs will export CARs in a stream-friendly order that will
/// enable processing them with tiny memory overhead. But that future is not
/// here yet.
///
/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
/// optimistic stream features: we load all block first, then walk the MST.
///
/// This makes things much simpler: we only need to worry about spilling to disk
/// in one place, and we always have a reasonable expecatation about how much
/// work the init function will do. We can drop the CAR reader before walking,
/// so the sync/async boundaries become a little easier to work around.
#[derive(Debug)]
pub struct MemDriver {
    blocks: HashMap<Cid, MaybeProcessedBlock>,
    walker: Walker,
    process: fn(Bytes) -> Bytes,
}

impl MemDriver {
    /// Step through the record outputs, in rkey order
    pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
        let mut out = Vec::with_capacity(n);
        for _ in 0..n {
            // walk as far as we can until we run out of blocks or find a record
            let Some(output) = self.walker.step(&mut self.blocks, self.process)? else {
                break;
            };
            out.push(output);
        }
        if out.is_empty() {
            Ok(None)
        } else {
            Ok(Some(out))
        }
    }
}

/// A partially memory-loaded car file that needs disk spillover to continue
pub struct NeedDisk<R: AsyncRead + Unpin> {
    car: CarReader<R>,
    root: Cid,
    process: fn(Bytes) -> Bytes,
    max_size: usize,
    mem_blocks: HashMap<Cid, MaybeProcessedBlock>,
    pub commit: Option<Commit>,
}

impl<R: AsyncRead + Unpin> NeedDisk<R> {
    pub async fn finish_loading(
        mut self,
        mut store: DiskStore,
    ) -> Result<(Commit, DiskDriver), DriveError> {
        // move store in and back out so we can manage lifetimes
        // dump mem blocks into the store
        store = tokio::task::spawn(async move {
            let kvs = self
                .mem_blocks
                .into_iter()
                .map(|(k, v)| (k.to_bytes(), v.into_bytes()));

            store.put_many(kvs)?;
            Ok::<_, DriveError>(store)
        })
        .await??;

        let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1);

        let store_worker = tokio::task::spawn_blocking(move || {
            while let Some(chunk) = rx.blocking_recv() {
                let kvs = chunk
                    .into_iter()
                    .map(|(k, v)| (k.to_bytes(), v.into_bytes()));
                store.put_many(kvs)?;
            }
            Ok::<_, DriveError>(store)
        }); // await later

        // dump the rest to disk (in chunks)
        log::debug!("dumping the rest of the stream...");
        loop {
            let mut mem_size = 0;
            let mut chunk = vec![];
            loop {
                let Some((cid, data)) = self.car.next_block().await? else {
                    break;
                };
                // we still gotta keep checking for the root since we might not have it
                if cid == self.root {
                    let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
                    self.commit = Some(c);
                    continue;
                }

                let data = Bytes::from(data);

                // remaining possible types: node, record, other. optimistically process
                // TODO: get the actual in-memory size to compute disk spill
                let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
                mem_size += maybe_processed.len();
                chunk.push((cid, maybe_processed));
                if mem_size >= (self.max_size / 2) {
                    // soooooo if we're setting the db cache to max_size and then letting
                    // multiple chunks in the queue that are >= max_size, then at any time
                    // we might be using some multiple of max_size?
                    break;
                }
            }
            if chunk.is_empty() {
                break;
            }
            tx.send(chunk)
                .await
                .map_err(|_| DriveError::ChannelSendError)?;
        }
        drop(tx);
        log::debug!("done. waiting for worker to finish...");

        store = store_worker.await??;

        log::debug!("worker finished.");

        let commit = self.commit.ok_or(DriveError::MissingCommit)?;

        // the commit always must point to a Node; empty node => empty MST special case
        let db_bytes = store
            .get(&commit.data.to_bytes())
            .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))?
            .ok_or(DriveError::MissingCommit)?;

        let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) {
            MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?,
            MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?,
        };
        let walker = Walker::new(node);

        Ok((
            commit,
            DiskDriver {
                process: self.process,
                state: Some(BigState { store, walker }),
            },
        ))
    }
}

struct BigState {
    store: DiskStore,
    walker: Walker,
}

/// MST walker that reads from disk instead of an in-memory hashmap
pub struct DiskDriver {
    process: fn(Bytes) -> Bytes,
    state: Option<BigState>,
}

// for doctests only
#[doc(hidden)]
pub fn _get_fake_disk_driver() -> DiskDriver {
    DiskDriver {
        process: noop,
        state: None,
    }
}

impl DiskDriver {
    /// Walk the MST returning up to `n` rkey + record pairs
    ///
    /// ```no_run
    /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), DriveError> {
    /// # let mut disk_driver = _get_fake_disk_driver();
    /// while let Some(pairs) = disk_driver.next_chunk(256).await? {
    ///     for output in pairs {
    ///         println!("{}: size={}", output.rkey, output.data.len());
    ///     }
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
        let process = self.process;

        // state should only *ever* be None transiently while inside here
        let mut state = self.state.take().expect("DiskDriver must have Some(state)");

        // the big pain here is that we don't want to leave self.state in an
        // invalid state (None), so all the error paths have to make sure it
        // comes out again.
        let (state, res) =
            tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) {
                let mut out = Vec::with_capacity(n);

                for _ in 0..n {
                    // walk as far as we can until we run out of blocks or find a record
                    let step = match state.walker.disk_step(&mut state.store, process) {
                        Ok(s) => s,
                        Err(e) => {
                            return (state, Err(e.into()));
                        }
                    };
                    let Some(output) = step else {
                        break;
                    };
                    out.push(output);
                }

                (state, Ok::<_, DriveError>(out))
            })
            .await?; // on tokio JoinError, we'll be left with invalid state :(

        // *must* restore state before dealing with the actual result
        self.state = Some(state);

        let out = res?;

        if out.is_empty() {
            Ok(None)
        } else {
            Ok(Some(out))
        }
    }

    fn read_tx_blocking(
        &mut self,
        n: usize,
        tx: mpsc::Sender<Result<BlockChunk, DriveError>>,
    ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> {
        let BigState { store, walker } = self.state.as_mut().expect("valid state");

        loop {
            let mut out: BlockChunk = Vec::with_capacity(n);

            for _ in 0..n {
                // walk as far as we can until we run out of blocks or find a record

                let step = match walker.disk_step(store, self.process) {
                    Ok(s) => s,
                    Err(e) => return tx.blocking_send(Err(e.into())),
                };

                let Some(output) = step else {
                    break;
                };
                out.push(output);
            }

            if out.is_empty() {
                break;
            }
            tx.blocking_send(Ok(out))?;
        }

        Ok(())
    }

    /// Spawn the disk reading task into a tokio blocking thread
    ///
    /// The idea is to avoid so much sending back and forth to the blocking
    /// thread, letting a blocking task do all the disk reading work and sending
    /// records and rkeys back through an `mpsc` channel instead.
    ///
    /// This might also allow the disk work to continue while processing the
    /// records. It's still not yet clear if this method actually has much
    /// benefit over just using `.next_chunk(n)`.
    ///
    /// ```no_run
    /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), DriveError> {
    /// # let mut disk_driver = _get_fake_disk_driver();
    /// let (mut rx, join) = disk_driver.to_channel(512);
    /// while let Some(recvd) = rx.recv().await {
    ///     let pairs = recvd?;
    ///     for output in pairs {
    ///         println!("{}: size={}", output.rkey, output.data.len());
    ///     }
    ///
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn to_channel(
        mut self,
        n: usize,
    ) -> (
        mpsc::Receiver<Result<BlockChunk, DriveError>>,
        tokio::task::JoinHandle<Self>,
    ) {
        let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1);

        // sketch: this worker is going to be allowed to execute without a join handle
        let chan_task = tokio::task::spawn_blocking(move || {
            if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) {
                log::debug!("big car reader exited early due to dropped receiver channel");
            }
            self
        });

        (rx, chan_task)
    }

    /// Reset the disk storage so it can be reused.
    ///
    /// The store is returned, so it can be reused for another `DiskDriver`.
    pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> {
        let BigState { store, .. } = self.state.take().expect("valid state");
        store.reset().await?;
        Ok(store)
    }
}