noosphere_sphere/replication/
car.rs

1use anyhow::Result;
2use async_stream::try_stream;
3use bytes::Bytes;
4use cid::Cid;
5use futures_util::sink::SinkExt;
6use iroh_car::{CarHeader, CarWriter};
7use std::io::{Error as IoError, ErrorKind as IoErrorKind};
8use tokio::sync::mpsc::channel;
9use tokio_stream::Stream;
10use tokio_util::{
11    io::{CopyToBytes, SinkWriter},
12    sync::PollSender,
13};
14
15/// Takes a list of roots and a stream of blocks (pairs of [Cid] and
16/// corresponding [Vec<u8>]), and produces an async byte stream that yields a
17/// valid [CARv1](https://ipld.io/specs/transport/car/carv1/)
18pub fn car_stream<S>(
19    mut roots: Vec<Cid>,
20    block_stream: S,
21) -> impl Stream<Item = Result<Bytes, IoError>> + Send
22where
23    S: Stream<Item = Result<(Cid, Vec<u8>)>> + Send,
24{
25    if roots.is_empty() {
26        roots = vec![Cid::default()]
27    }
28
29    try_stream! {
30        let (tx, mut rx) = channel::<Bytes>(16);
31        let sink =
32            PollSender::new(tx).sink_map_err(|error| {
33                error!("Failed to send CAR frame: {}", error);
34                IoError::from(IoErrorKind::BrokenPipe)
35            });
36
37        let mut car_buffer = SinkWriter::new(CopyToBytes::new(sink));
38        let car_header = CarHeader::new_v1(roots);
39        let mut car_writer = CarWriter::new(car_header, &mut car_buffer);
40        let mut sent_blocks = false;
41
42        for await item in block_stream {
43            sent_blocks = true;
44            let (cid, block) = item.map_err(|error| {
45                error!("Failed to stream blocks: {}", error);
46                IoError::from(IoErrorKind::BrokenPipe)
47            })?;
48
49            car_writer.write(cid, block).await.map_err(|error| {
50                error!("Failed to write CAR frame: {}", error);
51                IoError::from(IoErrorKind::BrokenPipe)
52            })?;
53
54            car_writer.flush().await.map_err(|error| {
55                error!("Failed to flush CAR frames: {}", error);
56                IoError::from(IoErrorKind::BrokenPipe)
57            })?;
58
59            while let Ok(block) = rx.try_recv() {
60                yield block;
61            }
62       }
63
64       if !sent_blocks {
65            car_writer.write_header().await.map_err(|error| {
66                error!("Failed to write CAR frame: {}", error);
67                IoError::from(IoErrorKind::BrokenPipe)
68            })?;
69            car_writer.flush().await.map_err(|error| {
70                error!("Failed to flush CAR frames: {}", error);
71                IoError::from(IoErrorKind::BrokenPipe)
72            })?;
73
74            while let Ok(block) = rx.try_recv() {
75                yield block;
76            }
77       }
78    }
79}